kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1410088 - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/log/ core/src/main/scala/kafka/server/ core/src/main/scala/kafka/utils/ core/src/test/scala/other/kafka/ core/src/test/scala/unit/kafka/integration/ core/src/test/scala/...
Date Thu, 15 Nov 2012 22:54:47 GMT
Author: jkreps
Date: Thu Nov 15 22:54:45 2012
New Revision: 1410088

URL: http://svn.apache.org/viewvc?rev=1410088&view=rev
Log:
KAFKA-545 Add some log performance tests.


Removed:
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/ExampleUtils.java
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Nov 15 22:54:45
2012
@@ -608,5 +608,8 @@ private[kafka] class Log(val dir: File, 
   def getLastFlushedTime():Long = {
     return lastflushedTime.get
   }
+  
+  override def toString() = "Log(" + this.dir + ")"
+  
 }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Thu Nov 15
22:54:45 2012
@@ -82,7 +82,9 @@ class LogSegment(val messageSet: FileMes
    * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
    */
   def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = {
-    if(maxSize <= 0)
+    if(maxSize < 0)
+      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
+    if(maxSize == 0)
       return MessageSet.Empty
       
     val startPosition = translateOffset(startOffset)
@@ -99,6 +101,8 @@ class LogSegment(val messageSet: FileMes
           maxSize
         case Some(offset) => {
           // there is a max offset, translate it to a file position and use that to calculate
the max read size
+          if(offset < startOffset)
+            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d)
less than the start offset (%d).".format(offset, startOffset))
           val mapping = translateOffset(offset)
           val endPosition = 
             if(mapping == null)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu Nov
15 22:54:45 2012
@@ -159,4 +159,7 @@ class KafkaConfig private (val props: Ve
    * Increasing this value can increase the degree of I/O parallelism in the follower broker.
*/
   val numReplicaFetchers = props.getInt("replica.fetchers", 1)
   
+  /* the frequency with which the highwater mark is saved out to disk */
+  val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms",
5000L)
+  
  }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Thu
Nov 15 22:54:45 2012
@@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaCo
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
-      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread",
0, config.defaultFlushIntervalMs)
+      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread",
0, config.highWaterMarkCheckpointIntervalMs)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Thu
Nov 15 22:54:45 2012
@@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int
     }
   private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
 
-  def startup = {
+  def startup() = {
     executor = new ScheduledThreadPoolExecutor(numThreads)
     executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
     executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
Thu Nov 15 22:54:45 2012
@@ -63,7 +63,7 @@ object TestEndToEndLatency {
         println(i + "\t" + elapsed / 1000.0 / 1000.0)
       totalTime += elapsed
     }
-    println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms"
+    println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
     producer.close()
     connector.shutdown()
     System.exit(0)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
Thu Nov 15 22:54:45 2012
@@ -20,16 +20,22 @@ package kafka
 import java.io._
 import java.nio._
 import java.nio.channels._
+import scala.math._
 import joptsimple._
 
 object TestLinearWriteSpeed {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser
+    val dirOpt = parser.accepts("dir", "The directory to write to.")
+                           .withRequiredArg
+                           .describedAs("path")
+                           .ofType(classOf[java.lang.String])
+                           .defaultsTo(System.getProperty("java.io.tmpdir"))
     val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.")
                            .withRequiredArg
                            .describedAs("num_bytes")
-                           .ofType(classOf[java.lang.Integer])
+                           .ofType(classOf[java.lang.Long])
     val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
                            .withRequiredArg
                            .describedAs("num_bytes")
@@ -39,7 +45,18 @@ object TestLinearWriteSpeed {
                            .describedAs("num_files")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1)
-    
+   val reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between
updates.")
+                           .withRequiredArg
+                           .describedAs("ms")
+                           .ofType(classOf[java.lang.Long])
+                           .defaultsTo(1000)
+   val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
+                           .withRequiredArg
+                           .describedAs("mb")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(Integer.MAX_VALUE)
+   val mmapOpt = parser.accepts("mmap", "Mmap file.")
+                          
     val options = parser.parse(args : _*)
     
     for(arg <- List(bytesOpt, sizeOpt, filesOpt)) {
@@ -50,27 +67,84 @@ object TestLinearWriteSpeed {
       }
     }
 
-    val bytesToWrite = options.valueOf(bytesOpt).intValue
+    var bytesToWrite = options.valueOf(bytesOpt).longValue
+    val mmap = options.has(mmapOpt)
     val bufferSize = options.valueOf(sizeOpt).intValue
     val numFiles = options.valueOf(filesOpt).intValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).longValue
+    val dir = options.valueOf(dirOpt)
+    val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L
     val buffer = ByteBuffer.allocate(bufferSize)
     while(buffer.hasRemaining)
       buffer.put(123.asInstanceOf[Byte])
     
-    val channels = new Array[FileChannel](numFiles)
+    val writables = new Array[Writable](numFiles)
     for(i <- 0 until numFiles) {
-      val file = File.createTempFile("kafka-test", ".dat")
+      val file = new File(dir, "kafka-test-" + i + ".dat")
       file.deleteOnExit()
-      channels(i) = new RandomAccessFile(file, "rw").getChannel()
+      val raf = new RandomAccessFile(file, "rw")
+      raf.setLength(bytesToWrite / numFiles)
+      if(mmap)
+        writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE,
0, raf.length()))
+      else
+        writables(i) = new ChannelWritable(raf.getChannel())
     }
+    bytesToWrite = (bytesToWrite / numFiles) * numFiles
     
-    val begin = System.currentTimeMillis
-    for(i <- 0 until bytesToWrite / bufferSize) {
+    println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency"))
+    
+    val beginTest = System.nanoTime
+    var maxLatency = 0L
+    var totalLatency = 0L
+    var count = 0L
+    var written = 0L
+    var totalWritten = 0L
+    var lastReport = beginTest
+    while(totalWritten + bufferSize < bytesToWrite) {
       buffer.rewind()
-      channels(i % numFiles).write(buffer)
+      val start = System.nanoTime
+      writables((count % numFiles).toInt.abs).write(buffer)
+      val ellapsed = System.nanoTime - start
+      maxLatency = max(ellapsed, maxLatency)
+      totalLatency += ellapsed
+      written += bufferSize
+      count += 1
+      totalWritten += bufferSize
+      if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) {
+        val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0)
+        val mb = written / (1024.0*1024.0)
+        println("%10.3f\t%10.3f\t%10.3f".format(mb / ellapsedSecs, totalLatency / count.toDouble
/ (1000.0*1000.0), maxLatency / (1000.0 * 1000.0)))
+        lastReport = start
+        written = 0
+        maxLatency = 0L
+        totalLatency = 0L
+      } else if(written > maxThroughputBytes) {
+        // if we have written enough, just sit out this reporting interval
+        val lastReportMs = lastReport / (1000*1000)
+        val now = System.nanoTime / (1000*1000)
+        val sleepMs = lastReportMs + reportingInterval - now
+        if(sleepMs > 0)
+          Thread.sleep(sleepMs)
+      }
+    }
+    val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0)
+    println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec")
+  }
+  
+  trait Writable {
+    def write(buffer: ByteBuffer)
+  }
+  
+  class MmapWritable(val buffer: ByteBuffer) extends Writable {
+    def write(b: ByteBuffer) {
+      buffer.put(b)
+    }
+  }
+  
+  class ChannelWritable(val channel: FileChannel) extends Writable {
+    def write(b: ByteBuffer) {
+      channel.write(b)
     }
-    val elapsedSecs = (System.currentTimeMillis - begin) / 1000.0
-    System.out.println(bytesToWrite / (1024 * 1024 * elapsedSecs) + " MB per sec")
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Thu Nov 15 22:54:45 2012
@@ -42,9 +42,7 @@ class PrimitiveApiTest extends JUnit3Sui
   
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props) {
-    override val flushInterval = 1
-  }
+  val config = new KafkaConfig(props)
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
Thu Nov 15 22:54:45 2012
@@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMeta
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
-  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  val configs = props.map(p => new KafkaConfig(p))
   var brokers: Seq[Broker] = null
 
   override def setUp() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
Thu Nov 15 22:54:45 2012
@@ -37,7 +37,7 @@ import kafka.utils._
 
 class AsyncProducerTest extends JUnit3Suite {
   val props = createBrokerConfigs(1)
-  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  val configs = props.map(p => new KafkaConfig(p))
 
   override def setUp() {
     super.setUp()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Thu Nov 15 22:54:45 2012
@@ -28,9 +28,7 @@ import kafka.utils.{SystemTime, KafkaSch
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
-    override val defaultFlushIntervalMs = 100
-  })
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
   val topic = "foo"
   val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1),
new MockTime))
     

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
Thu Nov 15 22:54:45 2012
@@ -32,7 +32,6 @@ class LogRecoveryTest extends JUnit3Suit
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
     override val replicaMaxLagTimeMs = 5000L
     override val replicaMaxLagBytes = 10L
-    override val flushInterval = 10
     override val replicaMinBytes = 20
   })
   val topic = "new-topic"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
Thu Nov 15 22:54:45 2012
@@ -28,7 +28,7 @@ import junit.framework.Assert._
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
   val props = createBrokerConfigs(2)
-  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  val configs = props.map(p => new KafkaConfig(p))
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
   val topic2 = "bar"

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala Thu Nov 15
22:54:45 2012
@@ -41,4 +41,20 @@ class PerfConfig(args: Array[String]) {
   val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported
for each reporting " +
     "interval as configured by reporting-interval")
   val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for
the stats ")
+  val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
+    .withRequiredArg
+    .describedAs("size")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(100)
+  val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single
batch.")
+    .withRequiredArg
+    .describedAs("size")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(200)
+  val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent
compressed")
+    .withRequiredArg
+    .describedAs("compression codec ")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(0)
+  val helpOpt = parser.accepts("help", "Print usage.")
 }

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1410088&r1=1410087&r2=1410088&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
Thu Nov 15 22:54:45 2012
@@ -94,18 +94,8 @@ object ProducerPerformance extends Loggi
             .withRequiredArg()
             .ofType(classOf[java.lang.Integer])
             .defaultsTo(-1)
-    val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
-            .withRequiredArg
-            .describedAs("size")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(100)
     val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will
vary up to the given maximum.")
     val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
-    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single
batch.")
-            .withRequiredArg
-            .describedAs("batch size")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(200)
     val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
             .withRequiredArg
             .describedAs("number of threads")
@@ -127,6 +117,20 @@ object ProducerPerformance extends Loggi
             .describedAs("message send time gap")
             .ofType(classOf[java.lang.Integer])
             .defaultsTo(0)
+    val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request
timeout in ms")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3000)
+    val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks
required for producer request " +
+      "to complete")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
+    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the
CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set,
and this parameter is" +
             "set, the csv metrics will be outputed here")
@@ -154,10 +158,10 @@ object ProducerPerformance extends Loggi
     var isSync = options.has(syncOpt)
     var batchSize = options.valueOf(batchSizeOpt).intValue
     var numThreads = options.valueOf(numThreadsOpt).intValue
-    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
     val seqIdMode = options.has(initialMessageIdOpt)
     var initialMessageId: Int = 0
-    if (seqIdMode)
+    if(seqIdMode)
       initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
     val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()



Mime
View raw message