kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1359813 - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala perf/src/main/scala/kafka/perf/ProducerPerformance.scala system_test/common/util.sh system_test/single_host_multi_brokers/bin/run-test.sh
Date Tue, 10 Jul 2012 18:08:07 GMT
Author: junrao
Date: Tue Jul 10 18:08:07 2012
New Revision: 1359813

URL: http://svn.apache.org/viewvc?rev=1359813&view=rev
Log:
broker failure system test broken on replication branch; patched by John Fung; reviewed by
Joel Koshy and Jun Rao; KAFKA-306

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/branches/0.8/system_test/common/util.sh
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1359813&r1=1359812&r2=1359813&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Tue Jul 10 18:08:07 2012
@@ -27,6 +27,7 @@ import java.io.PrintStream
 import kafka.message._
 import kafka.utils.{Utils, Logging}
 import kafka.utils.ZKStringSerializer
+import kafka.serializer.StringDecoder
 
 /**
  * Consumer that dumps messages out to standard out.
@@ -235,7 +236,7 @@ object ConsoleConsumer extends Logging {
     override def init(props: Properties) {
       topicStr = props.getProperty("topic")
       if (topicStr != null) 
-        topicStr = topicStr + "-"
+        topicStr = topicStr + ":"
       else
         topicStr = ""
     }
@@ -246,6 +247,27 @@ object ConsoleConsumer extends Logging {
     }
   }
   
+  class DecodedMessageFormatter extends MessageFormatter {
+    var topicStr: String = _
+    val decoder = new StringDecoder()
+    
+    override def init(props: Properties) {
+      topicStr = props.getProperty("topic")
+      if (topicStr != null) 
+        topicStr = topicStr + ":"
+      else
+        topicStr = ""
+    }
+    
+    def writeTo(message: Message, output: PrintStream) {
+      try {
+        output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
+      } catch {
+        case e => e.printStackTrace()
+      }
+    }
+  }
+  
   def tryCleanupZookeeper(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1359813&r1=1359812&r2=1359813&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
Tue Jul 10 18:08:07 2012
@@ -91,12 +91,19 @@ object ProducerPerformance extends Loggi
       .withRequiredArg
       .describedAs("count")
       .ofType(classOf[java.lang.Integer])
-      .defaultsTo(10)
+      .defaultsTo(1)
     val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are
sent compressed")
       .withRequiredArg
       .describedAs("compression codec ")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(0)
+    val initialMessageIdOpt = parser.accepts("initial-message-id", "If set, messages will
be tagged with an " + 
+        "ID and sent by producer starting from this ID sequentially. Message content will
be String type and " + 
+        "in the form of 'Message:000...1:xxx...'")
+      .withRequiredArg()
+      .describedAs("initial message id")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
@@ -114,11 +121,24 @@ object ProducerPerformance extends Loggi
     val hideHeader = options.has(hideHeaderOpt)
     val brokerInfo = options.valueOf(brokerInfoOpt)
     val messageSize = options.valueOf(messageSizeOpt).intValue
-    val isFixSize = !options.has(varyMessageSizeOpt)
-    val isAsync = options.has(asyncOpt)
+    var isFixSize = !options.has(varyMessageSizeOpt)
+    var isAsync = options.has(asyncOpt)
     var batchSize = options.valueOf(batchSizeOpt).intValue
-    val numThreads = options.valueOf(numThreadsOpt).intValue
+    var numThreads = options.valueOf(numThreadsOpt).intValue
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+    val initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
+    val seqIdMode = options.has(initialMessageIdOpt)
+    
+    // override necessary flags in seqIdMode
+    if (seqIdMode) { 
+      isAsync = true
+      batchSize = 1
+      isFixSize = true
+
+      warn("seqIdMode - isAsync is overridden to:" + isAsync)
+      warn("seqIdMode - batchSize is overridden to:" + batchSize)
+      warn("seqIdMode - sFixSize is overridden to: " + isFixSize)
+    }
   }
 
   private def getStringOfLength(len: Int) : String = {
@@ -157,18 +177,27 @@ object ProducerPerformance extends Loggi
     }
     val producerConfig = new ProducerConfig(props)
     val producer = new Producer[Message, Message](producerConfig)
+    val seqIdNumDigit = 10   // no. of digits for max int value
 
     override def run {
       var bytesSent = 0L
       var lastBytesSent = 0L
       var nSends = 0
       var lastNSends = 0
-      val message = new Message(new Array[Byte](config.messageSize))
+      var message = new Message(new Array[Byte](config.messageSize))
       var reportTime = System.currentTimeMillis()
       var lastReportTime = reportTime
       val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads
/ config.batchSize
                               else config.numMessages / config.numThreads
       debug("Messages per thread = " + messagesPerThread)
+
+      // generate the sequential message ID
+      val SEP            = ":"              // message field separator
+      val messageIdLabel = "MessageID"
+      val threadIdLabel  = "ThreadID"
+      val topicLabel     = "Topic"
+      var leftPaddedSeqId : String = ""
+      
       var messageSet: List[Message] = Nil
       if(config.isFixSize) {
         for(k <- 0 until config.batchSize) {
@@ -178,6 +207,31 @@ object ProducerPerformance extends Loggi
       var j: Long = 0L
       while(j < messagesPerThread) {
         var strLength = config.messageSize
+        
+        if (config.seqIdMode) {
+          // Each thread gets a unique range of sequential no. for its ids.
+          // Eg. 1000 msg in 10 threads => 100 msg per thread
+          // thread 0 IDs :   0 ~  99
+          // thread 1 IDs : 100 ~ 199
+          // thread 2 IDs : 200 ~ 299
+          // . . .
+          
+          val msgId = config.initialMessageId + (messagesPerThread * threadId) + j
+          leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+          
+          val msgHeader = topicLabel      + SEP + 
+                          config.topic    + SEP + 
+                          threadIdLabel   + SEP + 
+                          threadId        + SEP + 
+                          messageIdLabel  + SEP + 
+                          leftPaddedSeqId + SEP
+                             
+          val seqMsgString = String.format("%1$-"+config.messageSize+"s", msgHeader).replace('
', 'x')
+          
+          debug(seqMsgString)
+          message = new Message(seqMsgString.getBytes())
+        }
+                
         if (!config.isFixSize) {
           for(k <- 0 until config.batchSize) {
             strLength = rand.nextInt(config.messageSize)
@@ -200,11 +254,9 @@ object ProducerPerformance extends Loggi
               rand.nextBytes(messageBytes)
               val message = new Message(messageBytes)
               producer.send(new ProducerData[Message,Message](config.topic, message))
-              debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
             }else {
               producer.send(new ProducerData[Message,Message](config.topic, message))
-              debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
             }
             nSends += 1

Modified: incubator/kafka/branches/0.8/system_test/common/util.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/common/util.sh?rev=1359813&r1=1359812&r2=1359813&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/common/util.sh (original)
+++ incubator/kafka/branches/0.8/system_test/common/util.sh Tue Jul 10 18:08:07 2012
@@ -27,7 +27,7 @@ get_random_range() {
     up=$2
     range=$(($up - $lo + 1))
 
-    return $(($(($RANDOM % range)) + $lo))
+    echo $(($(($RANDOM % range)) + $lo))
 }
 
 # =========================================

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1359813&r1=1359812&r2=1359813&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue
Jul 10 18:08:07 2012
@@ -58,14 +58,14 @@ readonly producer_prop_pathname=${config
 readonly consumer_prop_pathname=${config_dir}/consumer.properties
 
 readonly producer_perf_log_pathname=${base_dir}/producer_perf_output.log
-readonly producer_perf_crc_log_pathname=${base_dir}/producer_perf_crc.log
-readonly producer_perf_crc_sorted_log_pathname=${base_dir}/producer_perf_crc_sorted.log
-readonly producer_perf_crc_sorted_uniq_log_pathname=${base_dir}/producer_perf_crc_sorted_uniq.log
+readonly producer_perf_mid_log_pathname=${base_dir}/producer_perf_mid.log
+readonly producer_perf_mid_sorted_log_pathname=${base_dir}/producer_perf_mid_sorted.log
+readonly producer_perf_mid_sorted_uniq_log_pathname=${base_dir}/producer_perf_mid_sorted_uniq.log
 
 readonly console_consumer_log_pathname=${base_dir}/console_consumer.log
-readonly console_consumer_crc_log_pathname=${base_dir}/console_consumer_crc.log
-readonly console_consumer_crc_sorted_log_pathname=${base_dir}/console_consumer_crc_sorted.log
-readonly console_consumer_crc_sorted_uniq_log_pathname=${base_dir}/console_consumer_crc_sorted_uniq.log
+readonly console_consumer_mid_log_pathname=${base_dir}/console_consumer_mid.log
+readonly console_consumer_mid_sorted_log_pathname=${base_dir}/console_consumer_mid_sorted.log
+readonly console_consumer_mid_sorted_uniq_log_pathname=${base_dir}/console_consumer_mid_sorted_uniq.log
 
 readonly this_test_stderr_output_log_pathname=${base_dir}/this_test_stderr_output.log
 
@@ -129,20 +129,20 @@ cleanup() {
     rm -f $this_test_stderr_output_log_pathname
 
     rm -f $producer_perf_log_pathname
-    rm -f $producer_perf_crc_log_pathname
-    rm -f $producer_perf_crc_sorted_log_pathname
-    rm -f $producer_perf_crc_sorted_uniq_log_pathname
+    rm -f $producer_perf_mid_log_pathname
+    rm -f $producer_perf_mid_sorted_log_pathname
+    rm -f $producer_perf_mid_sorted_uniq_log_pathname
 
     rm -f $console_consumer_log_pathname
-    rm -f $console_consumer_crc_log_pathname
-    rm -f $console_consumer_crc_sorted_log_pathname
-    rm -f $console_consumer_crc_sorted_uniq_log_pathname
+    rm -f $console_consumer_mid_log_pathname
+    rm -f $console_consumer_mid_sorted_log_pathname
+    rm -f $console_consumer_mid_sorted_uniq_log_pathname
 }
 
 get_leader_brokerid() {
     log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
     info "found the log line: $log_line"
-    broker_id=`echo $log_line | awk -F ' ' '{print $5}'`
+    broker_id=`echo $log_line | sed s'/^.*INFO Broker //g' | awk -F ' ' '{print $1}'`
 
     return $broker_id
 }
@@ -189,6 +189,7 @@ start_producer_perf() {
     this_topic=$1
     zk_conn_str=$2
     no_msg_to_produce=$3
+    init_msg_id=$4
 
     info "starting producer performance"
 
@@ -196,10 +197,9 @@ start_producer_perf() {
         --brokerinfo "zk.connect=${zk_conn_str}" \
         --topic ${this_topic} \
         --messages $no_msg_to_produce \
-        --vary-message-size \
         --message-size 100 \
-        --threads 1 \
-        --async \
+        --threads 5 \
+        --initial-message-id $init_msg_id \
         2>&1 >> $producer_perf_log_pathname
 }
 
@@ -211,7 +211,7 @@ start_console_consumer() {
     $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
         --zookeeper $this_zk_conn_str \
         --topic $this_consumer_topic \
-        --formatter 'kafka.consumer.ConsoleConsumer$ChecksumMessageFormatter' \
+        --formatter 'kafka.consumer.ConsoleConsumer$DecodedMessageFormatter' \
         --consumer-timeout-ms $consumer_timeout_ms \
         2>&1 >> $console_consumer_log_pathname &
 }
@@ -269,21 +269,21 @@ validate_results() {
         info "##     ==> crc ${kafka_first_data_file_checksums[$i]}"
     done
 
-    # get the checksums from messages produced and consumed
-    grep checksum $console_consumer_log_pathname | tr -d ' ' | awk -F ':' '{print $2}' >
$console_consumer_crc_log_pathname
-    grep checksum $producer_perf_log_pathname | tr ' ' '\n' | grep checksum | awk -F ':'
'{print $2}' > $producer_perf_crc_log_pathname
+    # get the MessageID from messages produced and consumed
+    grep MessageID $console_consumer_log_pathname | sed s'/^.*MessageID://g' | awk -F ':'
'{print $1}' > $console_consumer_mid_log_pathname
+    grep MessageID $producer_perf_log_pathname    | sed s'/^.*MessageID://g' | awk -F ':'
'{print $1}' > $producer_perf_mid_log_pathname
 
-    sort $console_consumer_crc_log_pathname > $console_consumer_crc_sorted_log_pathname
-    sort $producer_perf_crc_log_pathname    > $producer_perf_crc_sorted_log_pathname
+    sort $console_consumer_mid_log_pathname > $console_consumer_mid_sorted_log_pathname
+    sort $producer_perf_mid_log_pathname    > $producer_perf_mid_sorted_log_pathname
 
-    sort -u $console_consumer_crc_sorted_log_pathname > $console_consumer_crc_sorted_uniq_log_pathname
-    sort -u $producer_perf_crc_sorted_log_pathname    > $producer_perf_crc_sorted_uniq_log_pathname
+    sort -u $console_consumer_mid_sorted_log_pathname > $console_consumer_mid_sorted_uniq_log_pathname
+    sort -u $producer_perf_mid_sorted_log_pathname    > $producer_perf_mid_sorted_uniq_log_pathname
 
-    msg_count_from_console_consumer=`cat $console_consumer_crc_log_pathname | wc -l | tr
-d ' '`
-    uniq_msg_count_from_console_consumer=`cat $console_consumer_crc_sorted_uniq_log_pathname
| wc -l | tr -d ' '`
+    msg_count_from_console_consumer=`cat $console_consumer_mid_log_pathname | wc -l | tr
-d ' '`
+    uniq_msg_count_from_console_consumer=`cat $console_consumer_mid_sorted_uniq_log_pathname
| wc -l | tr -d ' '`
 
-    msg_count_from_producer_perf=`cat $producer_perf_crc_log_pathname | wc -l | tr -d ' '`
-    uniq_msg_count_from_producer_perf=`cat $producer_perf_crc_sorted_uniq_log_pathname |
wc -l | tr -d ' '`
+    msg_count_from_producer_perf=`cat $producer_perf_mid_log_pathname | wc -l | tr -d ' '`
+    uniq_msg_count_from_producer_perf=`cat $producer_perf_mid_sorted_uniq_log_pathname |
wc -l | tr -d ' '`
 
     # report the findings
     echo
@@ -401,7 +401,8 @@ start_test() {
         info "sleeping for 5s"
         sleep 5
 
-        start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size
+        init_id=$(( ($i - 1) * $producer_msg_batch_size ))
+        start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
         info "sleeping for 15s"
         sleep 15
         echo



Mime
View raw message