kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1205525 - in /incubator/kafka/trunk: core/src/main/scala/kafka/tools/ core/src/main/scala/kafka/utils/ perf/ perf/config/ perf/src/ perf/src/main/ perf/src/main/scala/ perf/src/main/scala/kafka/ perf/src/main/scala/kafka/perf/ project/build/
Date Wed, 23 Nov 2011 18:19:10 GMT
Author: nehanarkhede
Date: Wed Nov 23 18:19:06 2011
New Revision: 1205525

URL: http://svn.apache.org/viewvc?rev=1205525&view=rev
Log:
KAFKA-176 Fix existing perf tools; patched by nehanarkhede; reviewed by junrao and jaykreps

Added:
    incubator/kafka/trunk/perf/
    incubator/kafka/trunk/perf/config/
    incubator/kafka/trunk/perf/config/log4j.properties
    incubator/kafka/trunk/perf/src/
    incubator/kafka/trunk/perf/src/main/
    incubator/kafka/trunk/perf/src/main/scala/
    incubator/kafka/trunk/perf/src/main/scala/kafka/
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
Removed:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/trunk/project/build/KafkaProject.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1205525&r1=1205524&r2=1205525&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Wed Nov
23 18:19:06 2011
@@ -167,7 +167,8 @@ object ReplayLogProducer extends Logging
     props.put("buffer.size", (64*1024).toString)
     props.put("compression.codec", config.compressionCodec.codec.toString)
     props.put("batch.size", config.batchSize.toString)
-
+    props.put("queue.enqueueTimeout.ms", "-1")
+    
     if(config.isAsync)
       props.put("producer.type", "async")
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1205525&r1=1205524&r2=1205525&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Wed Nov 23 18:19:06
2011
@@ -29,6 +29,7 @@ import java.util.Properties
 import scala.collection._
 import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * Helper functions!
@@ -604,6 +605,18 @@ object Utils extends Logging {
     else
       CompressionCodec.getCompressionCodec(codecValueString.toInt)
   }
+
+  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+    try {
+      val dir = "/consumers/" + groupId
+      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _ => // swallow
+    }
+  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Added: incubator/kafka/trunk/perf/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/config/log4j.properties?rev=1205525&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/config/log4j.properties (added)
+++ incubator/kafka/trunk/perf/config/log4j.properties Wed Nov 23 18:19:06 2011
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, fileAppender
+
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=perf.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=%m %n 
+
+# Turn on all our debugging info
+log4j.logger.kafka=INFO
+

Added: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1205525&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (added)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala Wed Nov
23 18:19:06 2011
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf
+
+import java.net.URI
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicLong
+import java.nio.channels.ClosedByInterruptException
+import joptsimple._
+import org.apache.log4j.Logger
+import kafka.message.Message
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, Utils}
+import java.util.{Random, Properties}
+import kafka.consumer._
+import java.text.SimpleDateFormat
+
+/**
+ * Performance test for the full zookeeper consumer
+ */
+object ConsumerPerformance {
+  private val logger = Logger.getLogger(getClass())
+
+  def main(args: Array[String]): Unit = {
+
+    val config = new ConsumerPerfConfig(args)
+    logger.info("Starting consumer...")
+    var totalMessagesRead = new AtomicLong(0)
+    var totalBytesRead = new AtomicLong(0)
+
+    if(!config.hideHeader) {
+      if(!config.showDetailedStats)
+        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
nMsg.sec")
+      else
+        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+    }
+
+    // clean up zookeeper state for this group id for every perf run
+    Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId)
+
+    val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
+
+    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic
-> config.numThreads))
+    var threadList = List[ConsumerPerfThread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i),
config,
+                                              totalMessagesRead, totalBytesRead)
+
+    logger.info("Sleeping for 1000 seconds.")
+    Thread.sleep(1000)
+    logger.info("starting threads")
+    val startMs = System.currentTimeMillis
+    for (thread <- threadList)
+      thread.start
+
+    for (thread <- threadList)
+      thread.shutdown
+
+    val endMs = System.currentTimeMillis
+    val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
+    if(!config.showDetailedStats) {
+      val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024)
+      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
config.dateFormat.format(endMs),
+        config.consumerConfig.fetchSize, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
+        totalMessagesRead.get/elapsedSecs))
+    }
+    System.exit(0)
+  }
+
+  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
+                                      "Multiple URLS can be given to allow fail-over.")
+                           .withRequiredArg
+                           .describedAs("urls")
+                           .ofType(classOf[String])
+    val groupIdOpt = parser.accepts("group", "The group id to consume on.")
+                           .withRequiredArg
+                           .describedAs("gid")
+                           .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
+                           .ofType(classOf[String])
+    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single
request.")
+                           .withRequiredArg
+                           .describedAs("size")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024 * 1024)
+    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not
already have an established " +
+      "offset to consume from, start with the latest message present in the log rather than
the earliest message.")
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV
size.")
+                           .withRequiredArg
+                           .describedAs("size")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(2 * 1024 * 1024)
+    val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(10)
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val props = new Properties
+    props.put("groupid", options.valueOf(groupIdOpt))
+    props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
+    props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
+    props.put("autooffset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else
"smallest")
+    props.put("zk.connect", options.valueOf(zkConnectOpt))
+    props.put("consumer.timeout.ms", "5000")
+    val consumerConfig = new ConsumerConfig(props)
+    val numThreads = options.valueOf(numThreadsOpt).intValue
+    val topic = options.valueOf(topicOpt)
+    val numMessages = options.valueOf(numMessagesOpt).longValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val showDetailedStats = options.has(showDetailedStatsOpt)
+    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val hideHeader = options.has(hideHeaderOpt)
+  }
+
+  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaMessageStream[Message],
+                           config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead:
AtomicLong)
+    extends Thread(name) {
+    private val shutdownLatch = new CountDownLatch(1)
+
+    def shutdown(): Unit = {
+      shutdownLatch.await
+    }
+
+    override def run() {
+      var bytesRead = 0L
+      var messagesRead = 0L
+      val startMs = System.currentTimeMillis
+      var lastReportTime: Long = startMs
+      var lastBytesRead = 0L
+      var lastMessagesRead = 0L
+
+      try {
+        for (message <- stream if messagesRead < config.numMessages) {
+          messagesRead += 1
+          bytesRead += message.payloadSize
+
+          if (messagesRead % config.reportingInterval == 0) {
+            if(config.showDetailedStats)
+              printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
lastReportTime, System.currentTimeMillis)
+            lastReportTime = System.currentTimeMillis
+            lastMessagesRead = messagesRead
+            lastBytesRead = bytesRead
+          }
+        }
+      }
+      catch {
+        case _: InterruptedException =>
+        case _: ClosedByInterruptException =>
+        case _: ConsumerTimeoutException =>
+        case e => throw e
+      }
+      totalMessagesRead.addAndGet(messagesRead)
+      totalBytesRead.addAndGet(bytesRead)
+      if(config.showDetailedStats)
+        printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
startMs, System.currentTimeMillis)
+      shutdownComplete
+    }
+
+    private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead:
Long, lastMessagesRead: Long,
+                             startMs: Long, endMs: Long) = {
+      val elapsedMs = endMs - startMs
+      val totalMBRead = (bytesRead*1.0)/(1024*1024)
+      val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024)
+      println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs),
id,
+        config.consumerConfig.fetchSize, totalMBRead,
+        1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0))
+    }
+
+    private def shutdownComplete() = shutdownLatch.countDown
+  }
+
+}

Added: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala?rev=1205525&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala (added)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/PerfConfig.scala Wed Nov 23 18:19:06
2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.perf
+
+import joptsimple.OptionParser
+import java.text.SimpleDateFormat
+
+class PerfConfig(args: Array[String]) {
+  val parser = new OptionParser
+  val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
+    .withRequiredArg
+    .describedAs("topic")
+    .ofType(classOf[String])
+  val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume")
+    .withRequiredArg
+    .describedAs("count")
+    .ofType(classOf[java.lang.Long])
+    .defaultsTo(Long.MaxValue)
+  val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print
progress info.")
+    .withRequiredArg
+    .describedAs("size")
+    .ofType(classOf[java.lang.Integer])
+    .defaultsTo(5000)
+  val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting
the time field. " +
+    "See java.text.SimpleDateFormat for options.")
+    .withRequiredArg
+    .describedAs("date format")
+    .ofType(classOf[String])
+    .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
+  val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported
for each reporting " +
+    "interval as configured by reporting-interval")
+  val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for
the stats ")
+}

Added: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1205525&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (added)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Wed Nov
23 18:19:06 2011
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf
+
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+import kafka.producer._
+import async.DefaultEventHandler
+import org.apache.log4j.Logger
+import joptsimple.OptionParser
+import kafka.message.{CompressionCodec, Message}
+import kafka.serializer.DefaultEncoder
+import java.text.SimpleDateFormat
+import java.util.{Date, Random, Properties}
+
+/**
+ * Load test for the producer
+ */
+object ProducerPerformance {
+
+  def main(args: Array[String]) {
+
+    val logger = Logger.getLogger(getClass)
+    val config = new ProducerPerfConfig(args)
+    if(!config.isFixSize)
+      logger.info("WARN: Throughput will be slower due to changing message size per request")
+
+    val totalBytesSent = new AtomicLong(0)
+    val totalMessagesSent = new AtomicLong(0)
+    val executor = Executors.newFixedThreadPool(config.numThreads)
+    val allDone = new CountDownLatch(config.numThreads)
+    val startMs = System.currentTimeMillis
+    val rand = new java.util.Random
+
+    if(!config.hideHeader) {
+      if(!config.showDetailedStats)
+        println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB,
MB.sec, " +
+          "total.data.sent.in.nMsg, nMsg.sec")
+      else
+        println("time, compression, thread.id, message.size, batch.size, total.data.sent.in.MB,
MB.sec, " +
+          "total.data.sent.in.nMsg, nMsg.sec")
+    }
+
+    for(i <- 0 until config.numThreads) {
+      executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone,
rand))
+    }
+
+    allDone.await()
+    val endMs = System.currentTimeMillis
+    val elapsedSecs = (endMs - startMs) / 1000.0
+    if(!config.showDetailedStats) {
+      val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
+      println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
+        config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize,
config.batchSize,
+        totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
+    }
+    System.exit(0)
+  }
+
+  class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
+    val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from
zookeeper or a list.")
+      .withRequiredArg
+      .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+      .ofType(classOf[String])
+    val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
+    val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will
vary up to the given maximum.")
+    val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single
batch.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(10)
+    val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are
sent compressed")
+      .withRequiredArg
+      .describedAs("compression codec ")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+
+    val options = parser.parse(args : _*)
+    for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    val topic = options.valueOf(topicOpt)
+    val numMessages = options.valueOf(numMessagesOpt).longValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val showDetailedStats = options.has(showDetailedStatsOpt)
+    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val hideHeader = options.has(hideHeaderOpt)
+    val brokerInfo = options.valueOf(brokerInfoOpt)
+    val messageSize = options.valueOf(messageSizeOpt).intValue
+    val isFixSize = !options.has(varyMessageSizeOpt)
+    val isAsync = options.has(asyncOpt)
+    var batchSize = options.valueOf(batchSizeOpt).intValue
+    val numThreads = options.valueOf(numThreadsOpt).intValue
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+  }
+
+  private def getStringOfLength(len: Int) : String = {
+    val strArray = new Array[Char](len)
+    for (i <- 0 until len)
+      strArray(i) = 'x'
+    return new String(strArray)
+  }
+
+  class ProducerThread(val threadId: Int,
+                           val config: ProducerPerfConfig,
+                           val totalBytesSent: AtomicLong,
+                           val totalMessagesSent: AtomicLong,
+                           val allDone: CountDownLatch,
+                           val rand: Random) extends Runnable {
+    val logger = Logger.getLogger(getClass)
+    val props = new Properties()
+    val brokerInfoList = config.brokerInfo.split("=")
+    if (brokerInfoList(0) == "zk.connect") {
+      props.put("zk.connect", brokerInfoList(1))
+      props.put("zk.sessiontimeout.ms", "300000")
+    }
+    else
+      props.put("broker.list", brokerInfoList(1))
+    props.put("compression.codec", config.compressionCodec.codec.toString)
+    props.put("reconnect.interval", Integer.MAX_VALUE.toString)
+    props.put("buffer.size", (64*1024).toString)
+    if(config.isAsync) {
+      props.put("producer.type","async")
+      props.put("batch.size", config.batchSize.toString)
+      props.put("queue.enqueueTimeout.ms", "-1")
+    }
+    val producerConfig = new ProducerConfig(props)
+    val producer = new Producer[Message, Message](producerConfig)
+
+    override def run {
+      var bytesSent = 0L
+      var lastBytesSent = 0L
+      var nSends = 0
+      var lastNSends = 0
+      val message = new Message(new Array[Byte](config.messageSize))
+      var reportTime = System.currentTimeMillis()
+      var lastReportTime = reportTime
+      val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads
/ config.batchSize
+                              else config.numMessages / config.numThreads
+      if(logger.isDebugEnabled) logger.debug("Messages per thread = " + messagesPerThread)
+      var messageSet: List[Message] = Nil
+      for(k <- 0 until config.batchSize) {
+        messageSet ::= message
+      }
+
+      var j: Long = 0L
+      while(j < messagesPerThread) {
+        var strLength = config.messageSize
+        if (!config.isFixSize) {
+          for(k <- 0 until config.batchSize) {
+            strLength = rand.nextInt(config.messageSize)
+            val message = new Message(getStringOfLength(strLength).getBytes)
+            messageSet ::= message
+            bytesSent += message.payloadSize
+          }
+        }else if(!config.isAsync) {
+          bytesSent += config.batchSize*message.payloadSize
+        }
+        try  {
+          if(!config.isAsync) {
+            producer.send(new ProducerData[Message,Message](config.topic, null, messageSet))
+            nSends += config.batchSize
+          }else {
+            if(!config.isFixSize) {
+              strLength = rand.nextInt(config.messageSize)
+              val message = new Message(getStringOfLength(strLength).getBytes)
+              producer.send(new ProducerData[Message,Message](config.topic, message))
+              bytesSent += message.payloadSize
+            }else {
+              producer.send(new ProducerData[Message,Message](config.topic, message))
+              bytesSent += message.payloadSize
+            }
+            nSends += 1
+          }
+        }catch {
+          case e: Exception => e.printStackTrace
+        }
+        if(nSends % config.reportingInterval == 0) {
+          reportTime = System.currentTimeMillis()
+          val elapsed = (reportTime - lastReportTime)/ 1000.0
+          val mbBytesSent = ((bytesSent - lastBytesSent) * 1.0)/(1024 * 1024)
+          val numMessagesPerSec = (nSends - lastNSends) / elapsed
+          val mbPerSec = mbBytesSent / elapsed
+          val formattedReportTime = config.dateFormat.format(reportTime)
+          if(config.showDetailedStats)
+            println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime,
config.compressionCodec.codec,
+              threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024),
mbPerSec, nSends, numMessagesPerSec))
+          lastReportTime = reportTime
+          lastBytesSent = bytesSent
+          lastNSends = nSends
+        }
+        j += 1
+      }
+      producer.close()
+      totalBytesSent.addAndGet(bytesSent)
+      totalMessagesSent.addAndGet(nSends)
+      allDone.countDown()
+    }
+  }
+}

Added: incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1205525&view=auto
==============================================================================
--- incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (added)
+++ incubator/kafka/trunk/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala Wed
Nov 23 18:19:06 2011
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.perf
+
+import java.net.URI
+import joptsimple._
+import kafka.utils._
+import kafka.server._
+import kafka.consumer.SimpleConsumer
+import org.apache.log4j.Logger
+import kafka.api.{OffsetRequest, FetchRequest}
+import java.text.SimpleDateFormat
+
+/**
+ * Performance test for the simple consumer
+ */
+object SimpleConsumerPerformance {
+
+  def main(args: Array[String]) {
+    val logger = Logger.getLogger(getClass)
+    val config = new ConsumerPerfConfig(args)
+
+    if(!config.hideHeader) {
+      if(!config.showDetailedStats)
+        println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
nMsg.sec")
+      else
+        println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
+    }
+
+    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize)
+
+    // reset to latest or smallest offset
+    var offset: Long = if(config.fromLatest) consumer.getOffsetsBefore(config.topic, config.partition,
OffsetRequest.LatestTime, 1).head
+                       else consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.EarliestTime,
1).head
+
+    val startMs = System.currentTimeMillis
+    var done = false
+    var totalBytesRead = 0L
+    var totalMessagesRead = 0L
+    var consumedInterval = 0
+    var lastReportTime: Long = startMs
+    var lastBytesRead = 0L
+    var lastMessagesRead = 0L
+    while(!done) {
+      val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset,
config.fetchSize))
+      var messagesRead = 0
+      var bytesRead = 0
+
+      for(message <- messages) {
+        messagesRead += 1
+        bytesRead += message.message.payloadSize
+      }
+      
+      if(messagesRead == 0 || totalMessagesRead > config.numMessages)
+        done = true
+      else
+        offset += messages.validBytes
+      
+      totalBytesRead += bytesRead
+      totalMessagesRead += messagesRead
+      consumedInterval += messagesRead
+      
+      if(consumedInterval > config.reportingInterval) {
+        if(config.showDetailedStats) {
+          val reportTime = System.currentTimeMillis
+          val elapsed = (reportTime - lastReportTime)/1000.0
+          val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024)
+          println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime),
config.fetchSize,
+            (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
+            totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
+        }
+        lastReportTime = SystemTime.milliseconds
+        lastBytesRead = totalBytesRead
+        lastMessagesRead = totalMessagesRead
+        consumedInterval = 0
+      }
+    }
+    val reportTime = System.currentTimeMillis
+    val elapsed = (reportTime - startMs) / 1000.0
+
+    if(!config.showDetailedStats) {
+      val totalMBRead = (totalBytesRead*1.0)/(1024*1024)
+      println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
+        config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed,
+        totalMessagesRead, totalMessagesRead/elapsed))
+    }
+    System.exit(0)
+  }
+
+  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
+    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect
to.")
+                           .withRequiredArg
+                           .describedAs("kafka://hostname:port")
+                           .ofType(classOf[String])
+    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not
already have an established " +
+      "offset to consume from, start with the latest message present in the log rather than
the earliest message.")
+    val partitionOpt = parser.accepts("partition", "The topic partition to consume from.")
+                           .withRequiredArg
+                           .describedAs("partition")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(0)
+    val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for
consumption.")
+                           .withRequiredArg
+                           .describedAs("bytes")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024*1024)
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(topicOpt, urlOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+    val url = new URI(options.valueOf(urlOpt))
+    val fetchSize = options.valueOf(fetchSizeOpt).intValue
+    val fromLatest = options.has(resetBeginningOffsetOpt)
+    val partition = options.valueOf(partitionOpt).intValue
+    val topic = options.valueOf(topicOpt)
+    val numMessages = options.valueOf(numMessagesOpt).longValue
+    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
+    val showDetailedStats = options.has(showDetailedStatsOpt)
+    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
+    val hideHeader = options.has(hideHeaderOpt)
+  }
+}

Modified: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1205525&r1=1205524&r2=1205525&view=diff
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (original)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Wed Nov 23 18:19:06 2011
@@ -23,6 +23,7 @@ class KafkaProject(info: ProjectInfo) ex
   lazy val core = project("core", "core-kafka", new CoreKafkaProject(_))
   lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core)
   lazy val contrib = project("contrib", "contrib", new ContribProject(_))
+  lazy val perf = project("perf", "perf", new KafkaPerfProject(_))
 
   lazy val releaseZipTask = core.packageDistTask
 
@@ -81,9 +82,6 @@ class KafkaProject(info: ProjectInfo) ex
       ZkClientDepAdder(pom)
     }
 
-    override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.com/maven2",
-      "Oracle Maven 2 Repository" at "http://download.oracle.com/maven", "maven.org" at "http://repo2.maven.org/maven2/")
-
     override def artifactID = "kafka"
     override def filterScalaJars = false
 
@@ -146,6 +144,29 @@ class KafkaProject(info: ProjectInfo) ex
 
   }
 
+  class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)
+     with IdeaProject
+     with CoreDependencies {
+    val perfPackageAction = packageAllAction
+    val dependsOnCore = core
+
+  //The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which
required
+  // some dependencies on various sun and javax packages.
+   override def ivyXML =
+    <dependencies>
+      <exclude module="javax"/>
+      <exclude module="jmxri"/>
+      <exclude module="jmxtools"/>
+      <exclude module="mail"/>
+      <exclude module="jms"/>
+    </dependencies>
+
+    override def artifactID = "kafka-perf"
+    override def filterScalaJars = false
+    override def javaCompileOptions = super.javaCompileOptions ++
+      List(JavaCompileOption("-Xlint:unchecked"))
+  }
+
   class KafkaExamplesProject(info: ProjectInfo) extends DefaultProject(info)
      with IdeaProject
      with CoreDependencies {



Mime
View raw message