kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3933; Always fully read deepIterator
Date Tue, 26 Jul 2016 01:58:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk aebab7cab -> 8a417c89d


KAFKA-3933; Always fully read deepIterator

Avoids leaking native memory and hence crashing brokers on bootup due to
running out of memory.

Seeeing as `messageFormat > 0` always reads the full compressed message
set and is the default going forwards, we can use that behaviour to
always close the compressor when calling `deepIterator`

Author: Tom Crayford <tcrayford@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1660 from tcrayford/dont_leak_native_memory_round_2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8a417c89
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8a417c89
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8a417c89

Branch: refs/heads/trunk
Commit: 8a417c89d2f0b7861b2dec26f02e4e302b64b604
Parents: aebab7c
Author: Tom Crayford <tcrayford@googlemail.com>
Authored: Tue Jul 26 02:31:37 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jul 26 02:41:46 2016 +0100

----------------------------------------------------------------------
 .../kafka/message/ByteBufferMessageSet.scala    | 69 ++++++++++----------
 1 file changed, 34 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8a417c89/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index a116d4b..98f6131 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -17,7 +17,7 @@
 
 package kafka.message
 
-import kafka.utils.{IteratorTemplate, Logging}
+import kafka.utils.{CoreUtils, IteratorTemplate, Logging}
 import kafka.common.{KafkaException, LongRef}
 import java.nio.ByteBuffer
 import java.nio.channels._
@@ -85,36 +85,45 @@ object ByteBufferMessageSet {
     new IteratorTemplate[MessageAndOffset] {
 
       val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset
+
+      if (wrapperMessage.payload == null)
+        throw new KafkaException(s"Message payload is null: $wrapperMessage")
+
       val wrapperMessageTimestampOpt: Option[Long] =
         if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else
None
       val wrapperMessageTimestampTypeOpt: Option[TimestampType] =
         if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else
None
-      if (wrapperMessage.payload == null)
-        throw new KafkaException(s"Message payload is null: $wrapperMessage")
-      val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
-      val compressed = try {
-        new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic,
inputStream))
-      } catch {
-        case ioe: IOException =>
-          throw new InvalidMessageException(s"Failed to instantiate input stream compressed
with ${wrapperMessage.compressionCodec}", ioe)
-      }
+
       var lastInnerOffset = -1L
 
-      val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0)
{
+      val messageAndOffsets = {
+        val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
+        val compressed = try {
+          new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic,
inputStream))
+        } catch {
+          case ioe: IOException =>
+            throw new InvalidMessageException(s"Failed to instantiate input stream compressed
with ${wrapperMessage.compressionCodec}", ioe)
+        }
+
         val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]()
         try {
           while (true)
-            innerMessageAndOffsets.add(readMessageFromStream())
+            innerMessageAndOffsets.add(readMessageFromStream(compressed))
         } catch {
           case eofe: EOFException =>
-            compressed.close()
+            // we don't do anything at all here, because the finally
+            // will close the compressed input stream, and we simply
+            // want to return the innerMessageAndOffsets
           case ioe: IOException =>
             throw new InvalidMessageException(s"Error while reading message from stream compressed
with ${wrapperMessage.compressionCodec}", ioe)
+        } finally {
+          CoreUtils.swallow(compressed.close())
         }
-        Some(innerMessageAndOffsets)
-      } else None
 
-      private def readMessageFromStream(): MessageAndOffset = {
+        innerMessageAndOffsets
+      }
+
+      private def readMessageFromStream(compressed: DataInputStream): MessageAndOffset =
{
         val innerOffset = compressed.readLong()
         val recordSize = compressed.readInt()
 
@@ -138,25 +147,15 @@ object ByteBufferMessageSet {
       }
 
       override def makeNext(): MessageAndOffset = {
-        messageAndOffsets match {
-          // Using inner offset and timestamps
-          case Some(innerMessageAndOffsets) =>
-            innerMessageAndOffsets.pollFirst() match {
-              case null => allDone()
-              case MessageAndOffset(message, offset) =>
-                val relativeOffset = offset - lastInnerOffset
-                val absoluteOffset = wrapperMessageOffset + relativeOffset
-                new MessageAndOffset(message, absoluteOffset)
-            }
-          // Not using inner offset and timestamps
-          case None =>
-            try readMessageFromStream()
-            catch {
-              case eofe: EOFException =>
-                compressed.close()
-                allDone()
-              case ioe: IOException =>
-                throw new KafkaException(ioe)
+        messageAndOffsets.pollFirst() match {
+          case null => allDone()
+          case nextMessage@ MessageAndOffset(message, offset) =>
+            if (wrapperMessage.magic > MagicValue_V0) {
+              val relativeOffset = offset - lastInnerOffset
+              val absoluteOffset = wrapperMessageOffset + relativeOffset
+              new MessageAndOffset(message, absoluteOffset)
+            } else {
+              nextMessage
             }
         }
       }


Mime
View raw message