kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1159466 - /incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Date Fri, 19 Aug 2011 01:09:26 GMT
Author: nehanarkhede
Date: Fri Aug 19 01:09:26 2011
New Revision: 1159466

URL: http://svn.apache.org/viewvc?rev=1159466&view=rev
Log:
ByteBufferMessageSet iterator bug returning incorrect offsets after reading a compressed empty
message set KAFKA-111; patched by Jun; reviewed by Neha

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1159466&r1=1159465&r2=1159466&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Fri
Aug 19 01:09:26 2011
@@ -111,9 +111,9 @@ class ByteBufferMessageSet(private val b
         if(size < 0 || topIter.remaining < size) {
           deepValidByteCount = currValidBytes
           if (currValidBytes == 0 || size < 0)
-            throw new InvalidMessageSizeException("invalid message size: %d only received
bytes: %d " +
-              " at %d possible causes (1) a single message larger than the fetch size; (2)
log corruption "
-                .format(size, topIter.remaining, currValidBytes))
+            throw new InvalidMessageSizeException("invalid message size: " + size + " only
received bytes: " +
+              topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single
message larger than " +
+              "the fetch size; (2) log corruption )")
           return allDone()
         }
         val message = topIter.slice()
@@ -126,23 +126,30 @@ class ByteBufferMessageSet(private val b
               logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
             innerIter = null
             currValidBytes += 4 + size
+            if(logger.isTraceEnabled)
+              logger.trace("currValidBytes = " + currValidBytes)
             new MessageAndOffset(newMessage, currValidBytes)
           case _ =>
             if(logger.isDebugEnabled)
               logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
             innerIter = CompressionUtils.decompress(newMessage).deepIterator
+            if (!innerIter.hasNext) {
+              currValidBytes += 4 + lastMessageSize
+              innerIter = null
+            }
             makeNext()
         }
       }
 
       override def makeNext(): MessageAndOffset = {
+        val isInnerDone = innerDone()
         if(logger.isDebugEnabled)
-          logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
-        innerDone match {
+          logger.debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
+        isInnerDone match {
           case true => makeNextOuter
           case false => {
             val messageAndOffset = innerIter.next
-            if(!innerIter.hasNext)
+            if (!innerIter.hasNext)
               currValidBytes += 4 + lastMessageSize
             new MessageAndOffset(messageAndOffset.message, currValidBytes)
           }



Mime
View raw message