kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2477: Fix a race condition between log append and fetch that causes OffsetOutOfRangeException.
Date Thu, 08 Oct 2015 04:59:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 693d4ca1a -> f1110c3fb


KAFKA-2477: Fix a race condition between log append and fetch that causes OffsetOutOfRangeException.

Tried two fixes. I prefer the second approach because it saves an additional offset search.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Jun Rao <junrao@gmail.com>

Closes #204 from becketqin/KAFKA-2477


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1110c3f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1110c3f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1110c3f

Branch: refs/heads/trunk
Commit: f1110c3fbb166f94204b6bb18bc4e1a9100d3c4e
Parents: 693d4ca
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Wed Oct 7 21:59:14 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 7 21:59:14 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala        | 39 ++++++++++++++++-----
 core/src/main/scala/kafka/log/LogSegment.scala |  9 ++---
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1110c3f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e5e8007..02205c9 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -470,22 +470,41 @@ class Log(val dir: File,
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo
= {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength,
startOffset, name, size))
 
-    // check if the offset is valid and in range
-    val next = nextOffsetMetadata.messageOffset
+    // Because we don't use lock for reading, the synchronization is a little bit tricky.
+    // We create the local variables to avoid race conditions with updates to the log.
+    val currentNextOffsetMetadata = nextOffsetMetadata
+    val next = currentNextOffsetMetadata.messageOffset
     if(startOffset == next)
-      return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
-    
+      return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty)
+
     var entry = segments.floorEntry(startOffset)
-      
+
     // attempt to read beyond the log end offset is an error
     if(startOffset > next || entry == null)
       throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments
in the range %d to %d.".format(startOffset, segments.firstKey, next))
     
-    // do the read on the segment with a base offset less than the target offset
+    // Do the read on the segment with a base offset less than the target offset
     // but if that segment doesn't contain any messages with an offset greater than that
     // continue to read from successive segments until we get some messages or we reach the
end of the log
     while(entry != null) {
-      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
+      // If the fetch occurs on the active segment, there might be a race condition where
two fetch requests occur after
+      // the message is appended but before the nextOffsetMetadata is updated. In that case
the second fetch may
+      // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed
position instead of the log
+      // end of the active segment.
+      val maxPosition = {
+        if (entry == segments.lastEntry) {
+          val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
+          // Check the segment again in case a new segment has just rolled out.
+          if (entry != segments.lastEntry)
+            // New log segment has rolled out, we can read up to the file end.
+            entry.getValue.size
+          else
+            exposedPos
+        } else {
+          entry.getValue.size
+        }
+      }
+      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
       if(fetchInfo == null) {
         entry = segments.higherEntry(entry.getKey)
       } else {
@@ -622,12 +641,14 @@ class Log(val dir: File,
       val prev = addSegment(segment)
       if(prev != null)
         throw new KafkaException("Trying to roll a new log segment for topic partition %s
with start offset %d while it already exists.".format(name, newOffset))
-      
+      // We need to update the segment base offset and append position data of the metadata
when log rolls.
+      // The next offset should not change.
+      updateLogEndOffset(nextOffsetMetadata.messageOffset)
       // schedule an asynchronous flush of the old segment
       scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
       
       info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime
- start) / (1000.0*1000.0)))
-      
+
       segment
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1110c3f/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 1377e8f..4de4c2b 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -112,12 +112,13 @@ class LogSegment(val log: FileMessageSet,
    * @param startOffset A lower bound on the first offset to include in the message set we
read
    * @param maxSize The maximum number of bytes to include in the message set we read
    * @param maxOffset An optional maximum offset for the message set we read
+   * @param maxPosition An optional maximum position in the log segment that should be exposed
for read.
    * 
    * @return The fetched data and the offset metadata of the first message whose offset is
>= startOffset,
    *         or null if the startOffset is larger than the largest offset in this log
    */
   @threadsafe
-  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
+  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long =
size): FetchDataInfo = {
     if(maxSize < 0)
       throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
 
@@ -138,8 +139,8 @@ class LogSegment(val log: FileMessageSet,
     val length = 
       maxOffset match {
         case None =>
-          // no max offset, just use the max size they gave unmolested
-          maxSize
+          // no max offset, just read until the max position
+          min((maxPosition - startPosition.position).toInt, maxSize)
         case Some(offset) => {
           // there is a max offset, translate it to a file position and use that to calculate
the max read size
           if(offset < startOffset)
@@ -150,7 +151,7 @@ class LogSegment(val log: FileMessageSet,
               logSize // the max offset is off the end of the log, use the end of the file
             else
               mapping.position
-          min(endPosition - startPosition.position, maxSize) 
+          min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
         }
       }
     FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))


Mime
View raw message