kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1176672 - in /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async: DefaultEventHandler.scala ProducerSendThread.scala
Date Wed, 28 Sep 2011 00:51:51 GMT
Author: junrao
Date: Wed Sep 28 00:51:51 2011
New Revision: 1176672

URL: http://svn.apache.org/viewvc?rev=1176672&view=rev
Log:
Bug in the queue timeout logic of the async producer; patched by Neha Narkhede; reviewed by
Jun Rao; KAFKA-138

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1176672&r1=1176671&r2=1176672&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Wed Sep 28 00:51:51 2011
@@ -83,8 +83,8 @@ private[kafka] class DefaultEventHandler
               case _ =>
                 if(config.compressedTopics.contains(topicAndEvents._1._1)) {
                   if(logger.isTraceEnabled)
-                    logger.trace("Sending %d messages with compression %d to topic %s on
partition %d"
-                      .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2,
config.compressionCodec.codec))
+                    logger.trace("Sending %d messages with compression codec %d to topic
%s on partition %d"
+                      .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1,
topicAndEvents._1._2))
                   new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
                 }
                 else {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1176672&r1=1176671&r2=1176672&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Wed Sep 28 00:51:51 2011
@@ -69,7 +69,7 @@ private[async] class ProducerSendThread[
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
-    Stream.continually(queue.poll(scala.math.max(0, queueTime - (lastSend - SystemTime.milliseconds)),
TimeUnit.MILLISECONDS))
+    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds),
TimeUnit.MILLISECONDS))
                       .takeWhile(item => if(item != null) item.getData != shutdownCommand
else true).foreach {
       currentQueueItem =>
         val elapsed = (SystemTime.milliseconds - lastSend)
@@ -129,4 +129,4 @@ private[async] class ProducerSendThread[
         logger.trace(event.getData.toString)
     }
   }
-}
\ No newline at end of file
+}



Mime
View raw message