kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3442; Fix FileMessageSet iterator.
Date Wed, 23 Mar 2016 14:16:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 255b5e138 -> 7af67ce22


KAFKA-3442; Fix FileMessageSet iterator.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #1112 from becketqin/KAFKA-3442


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7af67ce2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7af67ce2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7af67ce2

Branch: refs/heads/trunk
Commit: 7af67ce22aa02121d6b82dc54dad42358282e524
Parents: 255b5e1
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed Mar 23 07:15:59 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Mar 23 07:15:59 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   | 28 ++++++++++++--------
 .../unit/kafka/log/FileMessageSetTest.scala     | 17 ++++++++++--
 2 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7af67ce2/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 45b3df9..a164b4b 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -206,7 +206,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
   /**
    * Convert this message set to use the specified message format.
    */
-  def toMessageFormat(toMagicValue: Byte): ByteBufferMessageSet = {
+  def toMessageFormat(toMagicValue: Byte): MessageSet = {
     val offsets = new ArrayBuffer[Long]
     val newMessages = new ArrayBuffer[Message]
     this.foreach { messageAndOffset =>
@@ -224,11 +224,16 @@ class FileMessageSet private[kafka](@volatile var file: File,
       }
     }
 
-    // We use the offset seq to assign offsets so the offset of the messages does not change.
-    new ByteBufferMessageSet(
-      compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
-      offsetSeq = offsets,
-      newMessages: _*)
+    if (sizeInBytes > 0 && newMessages.size == 0) {
+      // This indicates that the message is too large. We just return all the bytes in the
file message set.
+      this
+    } else {
+      // We use the offset seq to assign offsets so the offset of the messages does not change.
+      new ByteBufferMessageSet(
+        compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec),
+        offsetSeq = offsets,
+        newMessages: _*)
+    }
   }
 
   /**
@@ -245,10 +250,11 @@ class FileMessageSet private[kafka](@volatile var file: File,
   def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
     new IteratorTemplate[MessageAndOffset] {
       var location = start
-      val sizeOffsetBuffer = ByteBuffer.allocate(12)
+      val sizeOffsetLength = 12
+      val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength)
 
       override def makeNext(): MessageAndOffset = {
-        if(location >= end)
+        if(location + sizeOffsetLength >= end)
           return allDone()
 
         // read the size of the item
@@ -260,20 +266,20 @@ class FileMessageSet private[kafka](@volatile var file: File,
         sizeOffsetBuffer.rewind()
         val offset = sizeOffsetBuffer.getLong()
         val size = sizeOffsetBuffer.getInt()
-        if(size < Message.MinMessageOverhead)
+        if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size >
end)
           return allDone()
         if(size > maxMessageSize)
           throw new CorruptRecordException("Message size exceeds the largest allowable message
size (%d).".format(maxMessageSize))
 
         // read the item itself
         val buffer = ByteBuffer.allocate(size)
-        channel.read(buffer, location + 12)
+        channel.read(buffer, location + sizeOffsetLength)
         if(buffer.hasRemaining)
           return allDone()
         buffer.rewind()
 
         // increment the location and return the item
-        location += size + 12
+        location += size + sizeOffsetLength
         new MessageAndOffset(new Message(buffer), offset)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7af67ce2/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index a3e5b2d..534443c 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -133,11 +133,13 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   def testIteratorWithLimits() {
     val message = messageSet.toList(1)
     val start = messageSet.searchFor(1, 0).position
-    val size = message.message.size
+    val size = message.message.size + 12
     val slice = messageSet.read(start, size)
     assertEquals(List(message), slice.toList)
+    val slice2 = messageSet.read(start, size - 1)
+    assertEquals(List(), slice2.toList)
   }
-  
+
   /**
    * Test the truncateTo method lops off messages and appropriately updates the size
    */
@@ -203,6 +205,17 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   }
 
   @Test
+  def testFormatConversionWithPartialMessage() {
+    val message = messageSet.toList(1)
+    val start = messageSet.searchFor(1, 0).position
+    val size = message.message.size + 12
+    val slice = messageSet.read(start, size - 1)
+    val messageV0 = slice.toMessageFormat(Message.MagicValue_V0)
+    assertEquals("No message should be there", 0, messageV0.size)
+    assertEquals(s"There should be ${size - 1} bytes", size - 1, messageV0.sizeInBytes)
+  }
+
+  @Test
   def testMessageFormatConversion() {
 
     // Prepare messages.


Mime
View raw message