kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1310482 - /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Date Fri, 06 Apr 2012 16:59:45 GMT
Author: junrao
Date: Fri Apr  6 16:59:44 2012
New Revision: 1310482

URL: http://svn.apache.org/viewvc?rev=1310482&view=rev
Log:
CallbackHandler.afterDequeuingExistingData is not called during event queue timeout; patched
by Jun Rao; reviewed by Neha Narkhede; KAFKA-326

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1310482&r1=1310481&r2=1310482&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Fri Apr  6 16:59:44 2012
@@ -73,18 +73,21 @@ private[async] class ProducerSendThread[
         // check if the queue time is reached. This happens when the poll method above returns
after a timeout and
         // returns a null object
         val expired = currentQueueItem == null
-        if(currentQueueItem != null) {
+        if(currentQueueItem != null)
           trace("Dequeued item for topic %s and partition %d"
               .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
-          // handle the dequeued current item
-          if(cbkHandler != null)
-            events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
-          else
-            events += currentQueueItem
 
-          // check if the batch size is reached
-          full = events.size >= batchSize
+        // handle the dequeued current item
+        if(cbkHandler != null)
+          events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
+        else {
+          if (currentQueueItem != null)
+            events += currentQueueItem
         }
+
+        // check if the batch size is reached
+        full = events.size >= batchSize
+
         if(full || expired) {
           if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
           if(full) debug("Batch full. Sending..")



Mime
View raw message