kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1160946 - in /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async: AsyncProducer.scala DefaultEventHandler.scala ProducerSendThread.scala
Date Wed, 24 Aug 2011 00:44:22 GMT
Author: nehanarkhede
Date: Wed Aug 24 00:44:22 2011
New Revision: 1160946

URL: http://svn.apache.org/viewvc?rev=1160946&view=rev
Log:
AsyncProducer shutdown logic causes data loss; KAFKA-116; patched by nehanarkhede; reviewed
by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1160946&r1=1160945&r2=1160946&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Wed
Aug 24 00:44:22 2011
@@ -90,26 +90,26 @@ private[kafka] class AsyncProducer[T](co
     if(cbkHandler != null)
       data = cbkHandler.beforeEnqueue(data)
 
-    val added = if (config.enqueueTimeoutMs != 0) {
-      try {
-        if (config.enqueueTimeoutMs < 0) {
-          queue.put(data)
-          true
+    val added = config.enqueueTimeoutMs match {
+      case 0  =>
+        queue.offer(data)
+      case _  =>
+        try {
+          config.enqueueTimeoutMs < 0 match {
+          case true =>
+            queue.put(data)
+            true
+          case _ =>
+            queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+          }
         }
-        else {
-          queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+        catch {
+          case e: InterruptedException =>
+            val msg = "%s interrupted during enqueue of event %s.".format(
+              getClass.getSimpleName, event.toString)
+            logger.error(msg)
+            throw new AsyncProducerInterruptedException(msg)
         }
-      }
-      catch {
-        case e: InterruptedException =>
-          val msg = "%s interrupted during enqueue of event %s.".format(
-            getClass.getSimpleName, event.toString)
-          logger.error(msg)
-          throw new AsyncProducerInterruptedException(msg)
-      }
-    }
-    else {
-      queue.offer(data)
     }
 
     if(cbkHandler != null)
@@ -121,7 +121,7 @@ private[kafka] class AsyncProducer[T](co
       throw new QueueFullException("Event queue is full of unsent messages, could not send
event: " + event.toString)
     }else {
       if(logger.isTraceEnabled) {
-        logger.trace("Added event to send queue for topic: " + topic + ":" + event.toString)
+        logger.trace("Added event to send queue for topic: " + topic + ", partition: " +
partition + ":" + event.toString)
         logger.trace("Remaining queue size: " + queue.remainingCapacity)
       }
     }
@@ -132,11 +132,13 @@ private[kafka] class AsyncProducer[T](co
       cbkHandler.close
       logger.info("Closed the callback handler")
     }
+    closed.set(true)
     queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1))
+    if(logger.isDebugEnabled)
+      logger.debug("Added shutdown command to the queue")
     sendThread.shutdown
     sendThread.awaitShutdown
     producer.close
-    closed.set(true)
     logger.info("Closed AsyncProducer")
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1160946&r1=1160945&r2=1160946&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Wed Aug 24 00:44:22 2011
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.producer.async
 
@@ -39,6 +39,10 @@ private[kafka] class DefaultEventHandler
     if(cbkHandler != null)
       processedEvents = cbkHandler.beforeSendingData(events)
 
+    if(logger.isTraceEnabled)
+      processedEvents.foreach(event => logger.trace("Handling event for Topic: %s, Partition:
%d"
+        .format(event.getTopic, event.getPartition)))
+
     send(serialize(collate(processedEvents), serializer), syncProducer)
   }
 
@@ -54,7 +58,6 @@ private[kafka] class DefaultEventHandler
   private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
                         serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet]
= {
     val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l =>
serializer.toMessage(l))))
-    val topicsAndPartitions = eventsPerTopic.map(e => e._1)
     /** enforce the compressed.topics config here.
      *  If the compression codec is anything other than NoCompressionCodec,
      *    Enable compression only for specified topics if any
@@ -66,25 +69,29 @@ private[kafka] class DefaultEventHandler
       ((topicAndEvents._1._1, topicAndEvents._1._2),
         config.compressionCodec match {
           case NoCompressionCodec =>
-            if(logger.isDebugEnabled)
-              logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size))
+            if(logger.isTraceEnabled)
+              logger.trace("Sending %d messages with no compression to topic %s on partition
%d"
+                .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2))
             new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
-                if(logger.isDebugEnabled)
-                  logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size,
config.compressionCodec.codec))
+                if(logger.isTraceEnabled)
+                  logger.trace("Sending %d messages with compression codec %d to topic %s
on partition %d"
+                    .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1,
topicAndEvents._1._2))
                 new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndEvents._1._1)) {
-                  if(logger.isDebugEnabled)
-                    logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size,
config.compressionCodec.codec))
+                  if(logger.isTraceEnabled)
+                    logger.trace("Sending %d messages with compression %d to topic %s on
partition %d"
+                      .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2,
config.compressionCodec.codec))
                   new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
                 }
                 else {
-                  if(logger.isDebugEnabled)
-                    logger.debug("Sending %d messages with no compression as %s is not in
compressed.topics - %s"
-                      .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString))
+                  if(logger.isTraceEnabled)
+                    logger.trace("Sending %d messages to topic %s and partition %d with no
compression as %s is not in compressed.topics - %s"
+                      .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2,
topicAndEvents._1._1,
+                      config.compressedTopics.toString))
                   new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
                 }
             }
@@ -104,8 +111,8 @@ private[kafka] class DefaultEventHandler
       remainingEvents = topicEvents._2
       distinctPartitions.foreach { p =>
         val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition ==
p)))._1
-		if(topicPartitionEvents.size > 0)
-          collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
+        if(topicPartitionEvents.size > 0)
+          collatedEvents += ((topic, p) -> topicPartitionEvents.map(q => q.getData))
       }
     }
     collatedEvents

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1160946&r1=1160945&r2=1160946&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Wed Aug 24 00:44:22 2011
@@ -77,6 +77,9 @@ private[async] class ProducerSendThread[
         // returns a null object
         val expired = currentQueueItem == null
         if(currentQueueItem != null) {
+          if(logger.isTraceEnabled)
+            logger.trace("Dequeued item for topic %s and partition %d"
+              .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
           // handle the dequeued current item
           if(cbkHandler != null)
             events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
@@ -97,6 +100,9 @@ private[async] class ProducerSendThread[
           events = new ListBuffer[QueueItem[T]]
         }
     }
+    if(queue.size > 0)
+      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d
remaining items in the queue"
+        .format(queue.size))
     if(cbkHandler != null) {
       logger.info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
       val addedEvents = cbkHandler.lastBatchBeforeClose



Mime
View raw message