Author: nehanarkhede
Date: Wed Aug 24 00:44:22 2011
New Revision: 1160946
URL: http://svn.apache.org/viewvc?rev=1160946&view=rev
Log:
AsyncProducer shutdown logic causes data loss; KAFKA-116; patched by nehanarkhede; reviewed
by junrao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1160946&r1=1160945&r2=1160946&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Wed
Aug 24 00:44:22 2011
@@ -90,26 +90,26 @@ private[kafka] class AsyncProducer[T](co
if(cbkHandler != null)
data = cbkHandler.beforeEnqueue(data)
- val added = if (config.enqueueTimeoutMs != 0) {
- try {
- if (config.enqueueTimeoutMs < 0) {
- queue.put(data)
- true
+ val added = config.enqueueTimeoutMs match {
+ case 0 =>
+ queue.offer(data)
+ case _ =>
+ try {
+ config.enqueueTimeoutMs < 0 match {
+ case true =>
+ queue.put(data)
+ true
+ case _ =>
+ queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+ }
}
- else {
- queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+ catch {
+ case e: InterruptedException =>
+ val msg = "%s interrupted during enqueue of event %s.".format(
+ getClass.getSimpleName, event.toString)
+ logger.error(msg)
+ throw new AsyncProducerInterruptedException(msg)
}
- }
- catch {
- case e: InterruptedException =>
- val msg = "%s interrupted during enqueue of event %s.".format(
- getClass.getSimpleName, event.toString)
- logger.error(msg)
- throw new AsyncProducerInterruptedException(msg)
- }
- }
- else {
- queue.offer(data)
}
if(cbkHandler != null)
@@ -121,7 +121,7 @@ private[kafka] class AsyncProducer[T](co
throw new QueueFullException("Event queue is full of unsent messages, could not send
event: " + event.toString)
}else {
if(logger.isTraceEnabled) {
- logger.trace("Added event to send queue for topic: " + topic + ":" + event.toString)
+ logger.trace("Added event to send queue for topic: " + topic + ", partition: " +
partition + ":" + event.toString)
logger.trace("Remaining queue size: " + queue.remainingCapacity)
}
}
@@ -132,11 +132,13 @@ private[kafka] class AsyncProducer[T](co
cbkHandler.close
logger.info("Closed the callback handler")
}
+ closed.set(true)
queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1))
+ if(logger.isDebugEnabled)
+ logger.debug("Added shutdown command to the queue")
sendThread.shutdown
sendThread.awaitShutdown
producer.close
- closed.set(true)
logger.info("Closed AsyncProducer")
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1160946&r1=1160945&r2=1160946&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Wed Aug 24 00:44:22 2011
@@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package kafka.producer.async
@@ -39,6 +39,10 @@ private[kafka] class DefaultEventHandler
if(cbkHandler != null)
processedEvents = cbkHandler.beforeSendingData(events)
+ if(logger.isTraceEnabled)
+ processedEvents.foreach(event => logger.trace("Handling event for Topic: %s, Partition:
%d"
+ .format(event.getTopic, event.getPartition)))
+
send(serialize(collate(processedEvents), serializer), syncProducer)
}
@@ -54,7 +58,6 @@ private[kafka] class DefaultEventHandler
private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet]
= {
val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l =>
serializer.toMessage(l))))
- val topicsAndPartitions = eventsPerTopic.map(e => e._1)
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
@@ -66,25 +69,29 @@ private[kafka] class DefaultEventHandler
((topicAndEvents._1._1, topicAndEvents._1._2),
config.compressionCodec match {
case NoCompressionCodec =>
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size))
+ if(logger.isTraceEnabled)
+ logger.trace("Sending %d messages with no compression to topic %s on partition
%d"
+ .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2))
new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size,
config.compressionCodec.codec))
+ if(logger.isTraceEnabled)
+ logger.trace("Sending %d messages with compression codec %d to topic %s
on partition %d"
+ .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1,
topicAndEvents._1._2))
new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
case _ =>
if(config.compressedTopics.contains(topicAndEvents._1._1)) {
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size,
config.compressionCodec.codec))
+ if(logger.isTraceEnabled)
+ logger.trace("Sending %d messages with compression %d to topic %s on
partition %d"
+ .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2,
config.compressionCodec.codec))
new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
}
else {
- if(logger.isDebugEnabled)
- logger.debug("Sending %d messages with no compression as %s is not in
compressed.topics - %s"
- .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString))
+ if(logger.isTraceEnabled)
+ logger.trace("Sending %d messages to topic %s and partition %d with no
compression as %s is not in compressed.topics - %s"
+ .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2,
topicAndEvents._1._1,
+ config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
}
}
@@ -104,8 +111,8 @@ private[kafka] class DefaultEventHandler
remainingEvents = topicEvents._2
distinctPartitions.foreach { p =>
val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition ==
p)))._1
- if(topicPartitionEvents.size > 0)
- collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
+ if(topicPartitionEvents.size > 0)
+ collatedEvents += ((topic, p) -> topicPartitionEvents.map(q => q.getData))
}
}
collatedEvents
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1160946&r1=1160945&r2=1160946&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Wed Aug 24 00:44:22 2011
@@ -77,6 +77,9 @@ private[async] class ProducerSendThread[
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
+ if(logger.isTraceEnabled)
+ logger.trace("Dequeued item for topic %s and partition %d"
+ .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
// handle the dequeued current item
if(cbkHandler != null)
events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
@@ -97,6 +100,9 @@ private[async] class ProducerSendThread[
events = new ListBuffer[QueueItem[T]]
}
}
+ if(queue.size > 0)
+ throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d
remaining items in the queue"
+ .format(queue.size))
if(cbkHandler != null) {
logger.info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
val addedEvents = cbkHandler.lastBatchBeforeClose
|