kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1159465 - in /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async: DefaultEventHandler.scala ProducerSendThread.scala
Date Fri, 19 Aug 2011 01:00:08 GMT
Author: nehanarkhede
Date: Fri Aug 19 01:00:07 2011
New Revision: 1159465

URL: http://svn.apache.org/viewvc?rev=1159465&view=rev
Log:
Bug in the collate logic of the DefaultEventHandler dispatches empty list of messages using
the producer KAFKA-110; patched by Neha; reviewed by Jun

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1159465&r1=1159464&r2=1159465&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Fri Aug 19 01:00:07 2011
@@ -104,7 +104,8 @@ private[kafka] class DefaultEventHandler
       remainingEvents = topicEvents._2
       distinctPartitions.foreach { p =>
         val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition ==
p)))._1
-        collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
+		if(topicPartitionEvents.size > 0)
+          collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
       }
     }
     collatedEvents

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1159465&r1=1159464&r2=1159465&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Fri Aug 19 01:00:07 2011
@@ -109,7 +109,8 @@ private[async] class ProducerSendThread[
   def tryToHandle(events: Seq[QueueItem[T]]) {
     try {
       if(logger.isDebugEnabled) logger.debug("Handling " + events.size + " events")
-      handler.handle(events, underlyingProducer, serializer)
+      if(events.size > 0)
+        handler.handle(events, underlyingProducer, serializer)
     }catch {
       case e: Exception => logger.error("Error in handling batch of " + events.size +
" events", e)
     }



Mime
View raw message