kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1403553 - in /incubator/kafka/branches/0.8: perf/src/main/scala/kafka/perf/ system_test/broker_failure/bin/ system_test/mirror_maker/bin/ system_test/producer_perf/bin/ system_test/replication_testsuite/testcase_0001/ system_test/utils/
Date Mon, 29 Oct 2012 22:09:44 GMT
Author: junrao
Date: Mon Oct 29 22:09:43 2012
New Revision: 1403553

URL: http://svn.apache.org/viewvc?rev=1403553&view=rev
Log:
extend DumpLogSegments to verify consistency btw data and index; patched by Yang Ye; reviewed
by Neha Narkhede and Jun Rao; KAFKA-577

Modified:
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
    incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/mirror_maker/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-compression-test.sh
    incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-test.sh
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
Mon Oct 29 22:09:43 2012
@@ -86,6 +86,10 @@ object ConsumerPerformance {
                            .withRequiredArg
                            .describedAs("urls")
                            .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
     val groupIdOpt = parser.accepts("group", "The group id to consume on.")
                            .withRequiredArg
                            .describedAs("gid")

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/PerfConfig.scala Mon Oct 29
22:09:43 2012
@@ -22,10 +22,6 @@ import joptsimple.OptionParser
 
 class PerfConfig(args: Array[String]) {
   val parser = new OptionParser
-  val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
-    .withRequiredArg
-    .describedAs("topic")
-    .ofType(classOf[String])
   val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume")
     .withRequiredArg
     .describedAs("count")

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
Mon Oct 29 22:09:43 2012
@@ -23,9 +23,10 @@ import kafka.producer._
 import org.apache.log4j.Logger
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
-import java.util.{Random, Properties}
+import java.util._
+import collection.immutable.List
 import kafka.utils.{VerifiableProperties, Logging}
-import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsReporterMBean, KafkaMetricsReporter,
KafkaMetricsConfig}
+import kafka.metrics.KafkaCSVMetricsReporter
 
 
 /**
@@ -47,14 +48,9 @@ object ProducerPerformance extends Loggi
     val startMs = System.currentTimeMillis
     val rand = new java.util.Random
 
-    if(!config.hideHeader) {
-      if(!config.showDetailedStats)
+    if(!config.hideHeader)
         println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB,
MB.sec, " +
-          "total.data.sent.in.nMsg, nMsg.sec")
-      else
-        println("time, compression, thread.id, message.size, batch.size, total.data.sent.in.MB,
MB.sec, " +
-          "total.data.sent.in.nMsg, nMsg.sec")
-    }
+                        "total.data.sent.in.nMsg, nMsg.sec")
 
     for(i <- 0 until config.numThreads) {
       executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone,
rand))
@@ -63,58 +59,73 @@ object ProducerPerformance extends Loggi
     allDone.await()
     val endMs = System.currentTimeMillis
     val elapsedSecs = (endMs - startMs) / 1000.0
-    if(!config.showDetailedStats) {
-      val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
-      println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
-        config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize,
config.batchSize,
-        totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
-    }
+    val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
+    println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
+      config.dateFormat.format(startMs), config.dateFormat.format(endMs),
+      config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
+      totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
     System.exit(0)
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be
specified.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of
broker host and port for bootstrap.")
+            .withRequiredArg
+            .describedAs("hostname:port,..,hostname:port")
+            .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics
to produce to")
       .withRequiredArg
-      .describedAs("hostname:port")
+      .describedAs("topic1,topic2..")
       .ofType(classOf[String])
-    val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request
timeout in ms")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3000)
-    val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks
required for producer request " +
-      "to complete")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(-1)
+    val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request
timeout in ms")
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(3000)
+    val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries
number")
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(3)
+    val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer
retry backoff time in milliseconds")
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(100)
+    val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks
required for producer request " +
+            "to complete")
+            .withRequiredArg()
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(-1)
     val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(100)
     val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will
vary up to the given maximum.")
-    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+    val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single
batch.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
+            .withRequiredArg
+            .describedAs("batch size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(200)
     val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
+            .withRequiredArg
+            .describedAs("number of threads")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1)
     val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are
sent compressed")
-      .withRequiredArg
-      .describedAs("compression codec ")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
-    val initialMessageIdOpt = parser.accepts("initial-message-id", "If set, messages will
be tagged with an " + 
-        "ID and sent by producer starting from this ID sequentially. Message content will
be String type and " + 
-        "in the form of 'Message:000...1:xxx...'")
-      .withRequiredArg()
-      .describedAs("initial message id")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(0)
+            .withRequiredArg
+            .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec
as 1, SnappyCompressionCodec as 2")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
+    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating
test data, If set, messages will be tagged with an " +
+            "ID and sent by producer starting from this ID sequentially. Message content
will be String type and " +
+            "in the form of 'Message:000...1:xxx...'")
+            .withRequiredArg()
+            .describedAs("initial message id")
+            .ofType(classOf[java.lang.Integer])
+    val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread
will wait for specified time between two sends")
+            .withRequiredArg()
+            .describedAs("message send time gap")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(0)
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the
CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set,
and this parameter is" +
             "set, the csv metrics will be outputed here")
@@ -123,36 +134,40 @@ object ProducerPerformance extends Loggi
       .ofType(classOf[java.lang.String])
 
     val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
+    for(arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
       }
     }
-    val topic = options.valueOf(topicOpt)
+    val topicsStr = options.valueOf(topicsOpt)
+    val topics = topicsStr.split(",")
     val numMessages = options.valueOf(numMessagesOpt).longValue
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    val showDetailedStats = options.has(showDetailedStatsOpt)
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val messageSize = options.valueOf(messageSizeOpt).intValue
     var isFixSize = !options.has(varyMessageSizeOpt)
-    var isAsync = options.has(asyncOpt)
+    var isSync = options.has(syncOpt)
     var batchSize = options.valueOf(batchSizeOpt).intValue
     var numThreads = options.valueOf(numThreadsOpt).intValue
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
-    val initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val seqIdMode = options.has(initialMessageIdOpt)
-    val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
-    val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
+    var initialMessageId: Int = 0
+    if (seqIdMode)
+      initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
+    val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
+    val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
+    val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
+    val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
 
     val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
 
     if (csvMetricsReporterEnabled) {
       val props = new Properties()
-      props.put("kafka.metrics.polling.interval.secs", "5")
+      props.put("kafka.metrics.polling.interval.secs", "1")
       props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
       if (options.has(metricsDirectoryOpt))
         props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
@@ -163,27 +178,7 @@ object ProducerPerformance extends Loggi
       KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
     }
 
-    // override necessary flags in seqIdMode
-    if (seqIdMode) { 
-      batchSize = 1
-      isFixSize = true
-
-      warn("seqIdMode - isAsync is overridden to:" + isAsync)
-      warn("seqIdMode - batchSize is overridden to:" + batchSize)
-      warn("seqIdMode - sFixSize is overridden to: " + isFixSize)
-    }
-  }
-
-  private def getStringOfLength(len: Int) : String = {
-    val strArray = new Array[Char](len)
-    for (i <- 0 until len)
-      strArray(i) = 'x'
-    return new String(strArray)
-  }
-
-  private def getByteArrayOfLength(len: Int): Array[Byte] = {
-    //new Array[Byte](len)
-    new Array[Byte]( if (len == 0) 5 else len )
+    val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
   }
 
   class ProducerThread(val threadId: Int,
@@ -197,117 +192,81 @@ object ProducerPerformance extends Loggi
     props.put("compression.codec", config.compressionCodec.codec.toString)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)
-    if(config.isAsync) {
+    if(!config.isSync) {
       props.put("producer.type","async")
       props.put("batch.size", config.batchSize.toString)
       props.put("queue.enqueueTimeout.ms", "-1")
     }
-    props.put("producer.request.required.acks", config.produceRequestRequiredAcks.toString)
-    props.put("producer.request.timeout.ms", config.produceRequestTimeoutMs.toString)
+    props.put("producer.request.required.acks", config.producerRequestRequiredAcks.toString)
+    props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
+    props.put("producer.num.retries", config.producerNumRetries.toString)
+    props.put("producer.retry.backoff.ms", config.producerRetryBackoffMs.toString)
 
     val producerConfig = new ProducerConfig(props)
     val producer = new Producer[Message, Message](producerConfig)
     val seqIdNumDigit = 10   // no. of digits for max int value
 
+    val messagesPerThread = config.numMessages / config.numThreads
+    debug("Messages per thread = " + messagesPerThread)
+
+    // generate the sequential message ID
+    private val SEP            = ":"              // message field separator
+    private val messageIdLabel = "MessageID"
+    private val threadIdLabel  = "ThreadID"
+    private val topicLabel     = "Topic"
+    private var leftPaddedSeqId : String = ""
+    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Message
= {
+      // Each thread gets a unique range of sequential no. for its ids.
+      // Eg. 1000 msg in 10 threads => 100 msg per thread
+      // thread 0 IDs :   0 ~  99
+      // thread 1 IDs : 100 ~ 199
+      // thread 2 IDs : 200 ~ 299
+      // . . .
+      leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+
+      val msgHeader = topicLabel      + SEP +
+              topic           + SEP +
+              threadIdLabel   + SEP +
+              threadId        + SEP +
+              messageIdLabel  + SEP +
+              leftPaddedSeqId + SEP
+
+      val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+      debug(seqMsgString)
+      return new Message(seqMsgString.getBytes())
+    }
+
+    private def generateProducerData(topic: String, messageId: Long): (ProducerData[Message,
Message], Int) = {
+      val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
+      val message = if(config.seqIdMode) {
+        val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
+        generateMessageWithSeqId(topic, seqId, msgSize)
+      }
+      else {
+        new Message(new Array[Byte](msgSize))
+      }
+      (new ProducerData[Message, Message](topic, null, message), message.payloadSize)
+    }
+
     override def run {
       var bytesSent = 0L
-      var lastBytesSent = 0L
       var nSends = 0
-      var lastNSends = 0
-      var message = new Message(new Array[Byte](config.messageSize))
-      var reportTime = System.currentTimeMillis()
-      var lastReportTime = reportTime
-      val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads
/ config.batchSize
-                              else config.numMessages / config.numThreads
-      debug("Messages per thread = " + messagesPerThread)
-
-      // generate the sequential message ID
-      val SEP            = ":"              // message field separator
-      val messageIdLabel = "MessageID"
-      val threadIdLabel  = "ThreadID"
-      val topicLabel     = "Topic"
-      var leftPaddedSeqId : String = ""
-      
       var j: Long = 0L
       while(j < messagesPerThread) {
-        var strLength = config.messageSize
-        
-        if (config.seqIdMode) {
-          // Each thread gets a unique range of sequential no. for its ids.
-          // Eg. 1000 msg in 10 threads => 100 msg per thread
-          // thread 0 IDs :   0 ~  99
-          // thread 1 IDs : 100 ~ 199
-          // thread 2 IDs : 200 ~ 299
-          // . . .
-          
-          val msgId = config.initialMessageId + (messagesPerThread * threadId) + j
-          leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
-          
-          val msgHeader = topicLabel      + SEP + 
-                          config.topic    + SEP + 
-                          threadIdLabel   + SEP + 
-                          threadId        + SEP + 
-                          messageIdLabel  + SEP + 
-                          leftPaddedSeqId + SEP
-                             
-          val seqMsgString = String.format("%1$-"+config.messageSize+"s", msgHeader).replace('
', 'x')
-          
-          debug(seqMsgString)
-          message = new Message(seqMsgString.getBytes())
-        }
-
-        var messageSet: List[Message] = Nil
-        if(config.isFixSize) {
-          for(k <- 0 until config.batchSize) {
-            messageSet ::= message
-          }
-        }
-
-        if (!config.isFixSize) {
-          for(k <- 0 until config.batchSize) {
-            strLength = rand.nextInt(config.messageSize)
-            messageSet ::= message
-            bytesSent += message.payloadSize
-          }
-        }else if(!config.isAsync) {
-          bytesSent += config.batchSize*message.payloadSize
-        }
-        try  {
-          if(!config.isAsync) {
-            producer.send(new ProducerData[Message,Message](config.topic, null, messageSet))
-            if(!config.isFixSize) messageSet = Nil
-            nSends += config.batchSize
-          }else {
-            if(!config.isFixSize) {
-              strLength = rand.nextInt(config.messageSize)
-              val messageBytes = getByteArrayOfLength(strLength)
-              rand.nextBytes(messageBytes)
-              val message = new Message(messageBytes)
-              producer.send(new ProducerData[Message,Message](config.topic, message))
-              bytesSent += message.payloadSize
-            }else {
-              producer.send(new ProducerData[Message,Message](config.topic, message))
-              bytesSent += message.payloadSize
+        try {
+          config.topics.foreach(
+            topic =>{
+              val (producerData, bytesSent_) = generateProducerData(topic, j)
+              bytesSent += bytesSent_
+              producer.send(producerData)
+              nSends += 1
+              if(config.messageSendGapMs > 0)
+                Thread.sleep(config.messageSendGapMs)
             }
-            nSends += 1
-          }
-        }catch {
+          )
+        } catch {
           case e: Exception => error("Error sending messages", e)
         }
-        if(nSends % config.reportingInterval == 0) {
-          reportTime = System.currentTimeMillis()
-          val elapsed = (reportTime - lastReportTime)/ 1000.0
-          val mbBytesSent = ((bytesSent - lastBytesSent) * 1.0)/(1024 * 1024)
-          val numMessagesPerSec = (nSends - lastNSends) / elapsed
-          val mbPerSec = mbBytesSent / elapsed
-          val formattedReportTime = config.dateFormat.format(reportTime)
-          if(config.showDetailedStats)
-            println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime,
config.compressionCodec.codec,
-              threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024),
mbPerSec, nSends, numMessagesPerSec))
-          lastReportTime = reportTime
-          lastBytesSent = bytesSent
-          lastNSends = nSends
-        }
         j += 1
       }
       producer.close()

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
Mon Oct 29 22:09:43 2012
@@ -117,6 +117,10 @@ object SimpleConsumerPerformance {
                            .withRequiredArg
                            .describedAs("kafka://hostname:port")
                            .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
     val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not
already have an established " +
       "offset to consume from, start with the latest message present in the log rather than
the earliest message.")
     val partitionOpt = parser.accepts("partition", "The topic partition to consume from.")

Modified: incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/broker_failure/bin/run-test.sh Mon Oct 29 22:09:43
2012
@@ -450,7 +450,7 @@ start_background_producer() {
         $base_dir/bin/kafka-run-class.sh \
             kafka.perf.ProducerPerformance \
             --brokerinfo zk.connect=localhost:2181 \
-            --topic $topic \
+            --topics $topic \
             --messages $num_msg_per_batch \
             --message-size $message_size \
             --threads $num_producer_threads \

Modified: incubator/kafka/branches/0.8/system_test/mirror_maker/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/mirror_maker/bin/run-test.sh?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/mirror_maker/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/mirror_maker/bin/run-test.sh Mon Oct 29 22:09:43
2012
@@ -131,7 +131,7 @@ start_producer() {
     topic=$1
     zk=$2
     info "start producing messages for topic $topic to zookeeper $zk ..."
-    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk
--topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size
--threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log
&
+    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk
--topics $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size
--threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log
&
     pid_producer=$!
 }
 

Modified: incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-compression-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-compression-test.sh?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-compression-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-compression-test.sh Mon
Oct 29 22:09:43 2012
@@ -28,7 +28,7 @@ $base_dir/../../bin/kafka-server-start.s
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --compression-codec 1 
+$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async --compression-codec 1 
 
 echo "wait for data to be persisted" 
 cur_offset="-1"

Modified: incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-test.sh?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/producer_perf/bin/run-test.sh Mon Oct 29 22:09:43
2012
@@ -28,7 +28,7 @@ $base_dir/../../bin/kafka-server-start.s
 
 sleep 4
 echo "start producing $num_messages messages ..."
-$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async
+$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092
--topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads
1 --reporting-interval 100000 num_messages --async
 
 echo "wait for data to be persisted" 
 cur_offset="-1"

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
(original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
Mon Oct 29 22:09:43 2012
@@ -29,7 +29,7 @@
       "entity_id": "1",
       "port": "9091",
       "brokerid": "1",
-      "log.file.size": "10240",
+      "log.file.size": "1024000000",
       "log.dir": "/tmp/kafka_server_1_logs",
       "log_filename": "kafka_server_9091.log",
       "config_filename": "kafka_server_9091.properties"
@@ -38,7 +38,7 @@
       "entity_id": "2",
       "port": "9092",
       "brokerid": "2",
-      "log.file.size": "10240",
+      "log.file.size": "1024000000",
       "log.dir": "/tmp/kafka_server_2_logs",
       "log_filename": "kafka_server_9092.log",
       "config_filename": "kafka_server_9092.properties"
@@ -47,7 +47,7 @@
       "entity_id": "3",
       "port": "9093",
       "brokerid": "3",
-      "log.file.size": "10240",
+      "log.file.size": "1024000000",
       "log.dir": "/tmp/kafka_server_3_logs",
       "log_filename": "kafka_server_9093.log",
       "config_filename": "kafka_server_9093.properties"
@@ -58,9 +58,9 @@
       "threads": "5",
       "compression-codec": "0",
       "message-size": "500",
-      "message": "100",
+      "message": "100000",
       "request-num-acks": "-1",
-      "async":"false",
+      "sync":"false",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1403553&r1=1403552&r2=1403553&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Mon Oct 29 22:09:43
2012
@@ -750,7 +750,7 @@ def start_console_consumer(systemTestEnv
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
                    "--consumer-timeout-ms " + timeoutMs,
-                   "--csv-reporter-enable",
+                   "--csv-reporter-enabled",
                    "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "metrics"),
                    formatterOption,
                    "--from-beginning ",
@@ -864,7 +864,7 @@ def start_producer_in_thread(testcaseEnv
                        "--broker-list " + brokerListStr,
                        "--initial-message-id " + str(initMsgId),
                        "--messages " + noMsgPerBatch,
-                       "--topic " + topic,
+                       "--topics " + topic,
                        "--threads " + threads,
                        "--compression-codec " + compCodec,
                        "--message-size " + messageSize,



Mime
View raw message