Author: nehanarkhede
Date: Fri Aug 19 00:47:34 2011
New Revision: 1159459
URL: http://svn.apache.org/viewvc?rev=1159459&view=rev
Log:
CompressionUtils introduces a GZIP header while compressing empty message sets KAFKA-109;
patched by Neha; reviewed by Jun
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
Fri Aug 19 00:47:34 2011
@@ -31,24 +31,8 @@ class ByteBufferMessageSet(private val b
def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
- this(compressionCodec match {
- case NoCompressionCodec =>
- val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
- val messageIterator = messages.iterator
- while(messageIterator.hasNext) {
- val message = messageIterator.next
- message.serializeTo(buffer)
- }
- buffer.rewind
- buffer
- case _ =>
- import scala.collection.JavaConversions._
- val message = CompressionUtils.compress(asBuffer(messages), compressionCodec)
- val buffer = ByteBuffer.allocate(message.serializedSize)
- message.serializeTo(buffer)
- buffer.rewind
- buffer
- }, 0L, ErrorMapping.NoError)
+ this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages):
_*),
+ 0L, ErrorMapping.NoError)
}
def this(messages: java.util.List[Message]) {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri
Aug 19 00:47:34 2011
@@ -43,22 +43,7 @@ class ByteBufferMessageSet(private val b
private var deepValidByteCount = -1L
def this(compressionCodec: CompressionCodec, messages: Message*) {
- this(
- compressionCodec match {
- case NoCompressionCodec =>
- val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
- for (message <- messages) {
- message.serializeTo(buffer)
- }
- buffer.rewind
- buffer
- case _ =>
- val message = CompressionUtils.compress(messages, compressionCodec)
- val buffer = ByteBuffer.allocate(message.serializedSize)
- message.serializeTo(buffer)
- buffer.rewind
- buffer
- }, 0L, ErrorMapping.NoError)
+ this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
}
def this(messages: Message*) {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala Fri Aug 19 00:47:34
2011
@@ -51,7 +51,30 @@ object MessageSet {
* The size of a size-delimited entry in a message set
*/
def entrySize(message: Message): Int = LogOverhead + message.size
-
+
+ def createByteBuffer(compressionCodec: CompressionCodec, messages: Message*): ByteBuffer
=
+ compressionCodec match {
+ case NoCompressionCodec =>
+ val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ for (message <- messages) {
+ message.serializeTo(buffer)
+ }
+ buffer.rewind
+ buffer
+ case _ =>
+ messages.size match {
+ case 0 =>
+ val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ buffer.rewind
+ buffer
+ case _ =>
+ val message = CompressionUtils.compress(messages, compressionCodec)
+ val buffer = ByteBuffer.allocate(message.serializedSize)
+ message.serializeTo(buffer)
+ buffer.rewind
+ buffer
+ }
+ }
}
/**
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1159459&r1=1159458&r2=1159459&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
Fri Aug 19 00:47:34 2011
@@ -69,7 +69,7 @@ trait BaseMessageSetTestCases extends JU
@Test
def testSizeInBytesWithCompression () {
assertEquals("Empty message set should have 0 bytes.",
- 30L, // overhead of the GZIP output stream
+ 0L, // overhead of the GZIP output stream
createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
}
}
|