kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8717; Reuse cached offset metadata when reading from log (#7081)
Date Tue, 30 Jul 2019 15:50:28 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a48b5d9  KAFKA-8717; Reuse cached offset metadata when reading from log (#7081)
a48b5d9 is described below

commit a48b5d900c6b5c9c52a97124a1b51aff3636c32c
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Jul 30 08:50:13 2019 -0700

    KAFKA-8717; Reuse cached offset metadata when reading from log (#7081)
    
    Although we currently cache offset metadata for the high watermark and last stable offset, we don't use it when reading from the log. Instead we always look it up from the index. This patch pushes fetch isolation into `Log.read` so that we are able to reuse the cached offset metadata.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  18 +-
 .../coordinator/group/GroupMetadataManager.scala   |   8 +-
 .../transaction/TransactionStateManager.scala      |  14 +-
 core/src/main/scala/kafka/log/Log.scala            |  99 ++++++----
 core/src/main/scala/kafka/log/LogSegment.scala     |  32 +---
 .../src/test/scala/other/kafka/StressTestLog.scala |   9 +-
 .../group/GroupMetadataManagerTest.scala           |  15 +-
 .../TransactionCoordinatorConcurrencyTest.scala    |   7 +-
 .../transaction/TransactionStateManagerTest.scala  |   7 +-
 .../unit/kafka/log/BrokerCompressionTest.scala     |   8 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   4 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  46 ++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 208 ++++++++++++++++++---
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  10 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  10 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   4 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   4 +-
 17 files changed, 314 insertions(+), 189 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f18ddd2..87cbe54 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -948,25 +948,13 @@ class Partition(val topicPartition: TopicPartition,
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
 
-    /* Read the LogOffsetMetadata prior to performing the read from the log.
-     * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.
-     * Using the log end offset after performing the read can lead to a race condition
-     * where data gets appended to the log immediately after the replica has consumed from it
-     * This can cause a replica to always be out of sync.
-     */
+    // Note we use the log end offset prior to the read. This ensures that any appends following
+    // the fetch do not prevent a follower from coming into sync.
     val initialHighWatermark = localLog.highWatermark
     val initialLogStartOffset = localLog.logStartOffset
     val initialLogEndOffset = localLog.logEndOffset
     val initialLastStableOffset = localLog.lastStableOffset
-
-    val maxOffsetOpt = fetchIsolation match {
-      case FetchLogEnd => None
-      case FetchHighWatermark => Some(initialHighWatermark)
-      case FetchTxnCommitted => Some(initialLastStableOffset)
-    }
-
-    val fetchedData = localLog.read(fetchOffset, maxBytes, maxOffsetOpt, minOneMessage,
-          includeAbortedTxns = fetchIsolation == FetchTxnCommitted)
+    val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
 
     LogReadInfo(
       fetchedData = fetchedData,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c666975..03c3e37 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.ReplicaManager
+import kafka.server.{FetchLogEnd, ReplicaManager}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -529,8 +529,10 @@ class GroupMetadataManager(brokerId: Int,
         val removedGroups = mutable.Set[String]()
 
         while (currOffset < logEndOffset && !shuttingDown.get()) {
-          val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None,
-            minOneMessage = true, includeAbortedTxns = false)
+          val fetchDataInfo = log.read(currOffset,
+            maxLength = config.loadBufferSize,
+            isolation = FetchLogEnd,
+            minOneMessage = true)
           val memRecords = fetchDataInfo.records match {
             case records: MemoryRecords => records
             case fileRecords: FileRecords =>
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 92cba50..45ee4e9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -24,8 +24,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.log.LogConfig
 import kafka.message.UncompressedCodec
-import kafka.server.Defaults
-import kafka.server.ReplicaManager
+import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.zk.KafkaZkClient
@@ -297,12 +296,13 @@ class TransactionStateManager(brokerId: Int,
         var currOffset = log.logStartOffset
 
         try {
-          while (currOffset < logEndOffset
-            && !shuttingDown.get()
-            && inReadLock(stateLock) {loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
+          while (currOffset < logEndOffset && !shuttingDown.get() && inReadLock(stateLock) {
+            loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
               idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
-            val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None,
-              minOneMessage = true, includeAbortedTxns = false)
+            val fetchDataInfo = log.read(currOffset,
+              maxLength = config.transactionLogLoadBufferSize,
+              isolation = FetchLogEnd,
+              minOneMessage = true)
             val memRecords = fetchDataInfo.records match {
               case records: MemoryRecords => records
               case fileRecords: FileRecords =>
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5e12e31..f8df211 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -34,7 +34,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -844,6 +844,8 @@ class Log(@volatile var dir: File,
       // and we can skip the loading. This is an optimization for users which are not yet using
       // idempotent/transactional features yet.
       if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
+        val segmentOfLastOffset = floorLogSegment(lastOffset)
+
         logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
           val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
           producerStateManager.updateMapEndOffset(startOffset)
@@ -851,7 +853,18 @@ class Log(@volatile var dir: File,
           if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
             producerStateManager.takeSnapshot()
 
-          val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
+          val maxPosition = if (segmentOfLastOffset.contains(segment)) {
+            Option(segment.translateOffset(lastOffset))
+              .map(_.position)
+              .getOrElse(segment.size)
+          } else {
+            segment.size
+          }
+
+          val fetchDataInfo = segment.read(startOffset,
+            maxSize = Int.MaxValue,
+            maxPosition = maxPosition,
+            minOneMessage = false)
           if (fetchDataInfo != null)
             loadProducersFromLog(producerStateManager, fetchDataInfo.records)
         }
@@ -1389,43 +1402,60 @@ class Log(@volatile var dir: File,
     }
   }
 
+  private def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
+                                 includeAbortedTxns: Boolean): FetchDataInfo = {
+    val abortedTransactions =
+      if (includeAbortedTxns) Some(List.empty[AbortedTransaction])
+      else None
+    FetchDataInfo(fetchOffsetMetadata,
+      MemoryRecords.EMPTY,
+      firstEntryIncomplete = false,
+      abortedTransactions = abortedTransactions)
+  }
+
   /**
    * Read messages from the log.
    *
    * @param startOffset The offset to begin reading at
    * @param maxLength The maximum number of bytes to read
-   * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
+   * @param isolation The fetch isolation, which controls the maximum offset we are allowed to read
    * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
-   * @param includeAbortedTxns Whether or not to lookup aborted transactions for fetched data
    * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
    * @return The fetch data information including fetch starting offset metadata and messages read.
    */
   def read(startOffset: Long,
            maxLength: Int,
-           maxOffset: Option[Long],
-           minOneMessage: Boolean,
-           includeAbortedTxns: Boolean): FetchDataInfo = {
+           isolation: FetchIsolation,
+           minOneMessage: Boolean): FetchDataInfo = {
     maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
       trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
 
-      // Because we don't use lock for reading, the synchronization is a little bit tricky.
+      val includeAbortedTxns = isolation == FetchTxnCommitted
+
+      // Because we don't use the lock for reading, the synchronization is a little bit tricky.
       // We create the local variables to avoid race conditions with updates to the log.
-      val currentNextOffsetMetadata = nextOffsetMetadata
-      val next = currentNextOffsetMetadata.messageOffset
-      if (startOffset == next) {
-        val abortedTransactions =
-          if (includeAbortedTxns) Some(List.empty[AbortedTransaction])
-          else None
-        return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,
-          abortedTransactions = abortedTransactions)
-      }
+      val endOffsetMetadata = nextOffsetMetadata
+      val endOffset = nextOffsetMetadata.messageOffset
+      if (startOffset == endOffset)
+        return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)
 
       var segmentEntry = segments.floorEntry(startOffset)
 
       // return error on attempt to read beyond the log end offset or read below log start offset
-      if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
+      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
         throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
-          s"but we only have log segments in the range $logStartOffset to $next.")
+          s"but we only have log segments in the range $logStartOffset to $endOffset.")
+
+      val maxOffsetMetadata = isolation match {
+        case FetchLogEnd => nextOffsetMetadata
+        case FetchHighWatermark => fetchHighWatermarkMetadata
+        case FetchTxnCommitted => fetchLastStableOffsetMetadata
+      }
+
+      if (startOffset > maxOffsetMetadata.messageOffset) {
+        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
+        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
+      }
 
       // Do the read on the segment with a base offset less than the target offset
       // but if that segment doesn't contain any messages with an offset greater than that
@@ -1433,24 +1463,16 @@ class Log(@volatile var dir: File,
       while (segmentEntry != null) {
         val segment = segmentEntry.getValue
 
-        // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
-        // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
-        // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
-        // end of the active segment.
         val maxPosition = {
-          if (segmentEntry == segments.lastEntry) {
-            val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
-            // Check the segment again in case a new segment has just rolled out.
-            if (segmentEntry != segments.lastEntry)
-            // New log segment has rolled out, we can read up to the file end.
-              segment.size
-            else
-              exposedPos
+          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
+          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
+            maxOffsetMetadata.relativePositionInSegment
           } else {
             segment.size
           }
         }
-        val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
+
+        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
         if (fetchInfo == null) {
           segmentEntry = segments.higherEntry(segmentEntry.getKey)
         } else {
@@ -1623,9 +1645,8 @@ class Log(@volatile var dir: File,
   private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
     val fetchDataInfo = read(offset,
       maxLength = 1,
-      maxOffset = None,
-      minOneMessage = false,
-      includeAbortedTxns = false)
+      isolation = FetchLogEnd,
+      minOneMessage = false)
     fetchDataInfo.fetchOffsetMetadata
   }
 
@@ -2086,6 +2107,14 @@ class Log(@volatile var dir: File,
     }
   }
 
+  /**
+   * Get the largest log segment with a base offset less than or equal to the given offset, if one exists.
+   * @return the optional log segment
+   */
+  private def floorLogSegment(offset: Long): Option[LogSegment] = {
+    Option(segments.floorEntry(offset)).map(_.getValue)
+  }
+
   override def toString: String = {
     val logString = new StringBuilder
     logString.append(s"Log(dir=$dir")
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 37b2f9d..4c16291 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -43,8 +43,8 @@ import scala.math._
  * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
  *
  * @param log The file records containing log entries
- * @param offsetIndex The offset index
- * @param timeIndex The timestamp index
+ * @param lazyOffsetIndex The offset index
+ * @param lazyTimeIndex The timestamp index
  * @param txnIndex The transaction index
  * @param baseOffset A lower bound on the offsets in this segment
  * @param indexIntervalBytes The approximate number of bytes between entries in the index
@@ -279,7 +279,6 @@ class LogSegment private[log] (val log: FileRecords,
    * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
    *
    * @param startOffset A lower bound on the first offset to include in the message set we read
-   * @param maxOffset An optional maximum offset for the message set we read
    * @param maxSize The maximum number of bytes to include in the message set we read
    * @param maxPosition The maximum position in the log segment that should be exposed for read
    * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
@@ -288,12 +287,13 @@ class LogSegment private[log] (val log: FileRecords,
    *         or null if the startOffset is larger than the largest offset in this log
    */
   @threadsafe
-  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
+  def read(startOffset: Long,
+           maxSize: Int,
+           maxPosition: Long = size,
            minOneMessage: Boolean = false): FetchDataInfo = {
     if (maxSize < 0)
       throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
 
-    val logSize = log.sizeInBytes // this may change, need to save a consistent copy
     val startOffsetAndSize = translateOffset(startOffset)
 
     // if the start position is already off the end of the log, return null
@@ -312,25 +312,7 @@ class LogSegment private[log] (val log: FileRecords,
       return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
 
     // calculate the length of the message set to read based on whether or not they gave us a maxOffset
-    val fetchSize: Int = maxOffset match {
-      case None =>
-        // no max offset, just read until the max position
-        min((maxPosition - startPosition).toInt, adjustedMaxSize)
-      case Some(offset) =>
-        // there is a max offset, translate it to a file position and use that to calculate the max read size;
-        // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
-        // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
-        // offset between new leader's high watermark and the log end offset, we want to return an empty response.
-        if (offset < startOffset)
-          return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
-        val mapping = translateOffset(offset, startPosition)
-        val endPosition =
-          if (mapping == null)
-            logSize // the max offset is off the end of the log, use the end of the file
-          else
-            mapping.position
-        min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
-    }
+    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
 
     FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
       firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
@@ -467,7 +449,7 @@ class LogSegment private[log] (val log: FileRecords,
    */
   @threadsafe
   def readNextOffset: Long = {
-    val fetchData = read(offsetIndex.lastOffset, None, log.sizeInBytes)
+    val fetchData = read(offsetIndex.lastOffset, log.sizeInBytes)
     if (fetchData == null)
       baseOffset
     else
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 7821a0e..6ff3a98 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic._
 
 import kafka.log._
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 import org.apache.kafka.common.record.FileRecords
@@ -132,10 +132,9 @@ object StressTestLog {
     override def work() {
       try {
         log.read(currentOffset,
-          maxLength = 1024,
-          maxOffset = Some(currentOffset + 1),
-          minOneMessage = true,
-          includeAbortedTxns = false).records match {
+          maxLength = 1,
+          isolation = FetchLogEnd,
+          minOneMessage = true).records match {
           case read: FileRecords if read.sizeInBytes > 0 => {
             val first = read.batches.iterator.next()
             require(first.lastOffset == currentOffset, "We should either read nothing or the message we asked for.")
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 4e06716..faca447 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -23,12 +23,12 @@ import java.nio.ByteBuffer
 import java.util.Collections
 import java.util.Optional
 import java.util.concurrent.locks.ReentrantLock
+
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
-import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
-import kafka.server.HostedPartition
+import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@@ -43,6 +43,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
 import org.scalatest.Assertions.fail
+
 import scala.collection.JavaConverters._
 import scala.collection._
 
@@ -1875,9 +1876,8 @@ class GroupMetadataManagerTest {
     EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
-      maxOffset = EasyMock.eq(None),
-      minOneMessage = EasyMock.eq(true),
-      includeAbortedTxns = EasyMock.eq(false)))
+      isolation = EasyMock.eq(FetchLogEnd),
+      minOneMessage = EasyMock.eq(true)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
     EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
@@ -1981,9 +1981,8 @@ class GroupMetadataManagerTest {
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
-      maxOffset = EasyMock.eq(None),
-      minOneMessage = EasyMock.eq(true),
-      includeAbortedTxns = EasyMock.eq(false)))
+      isolation = EasyMock.eq(FetchLogEnd),
+      minOneMessage = EasyMock.eq(true)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
 
     EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 9719a48..bc6ed93 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -22,7 +22,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
 import kafka.log.Log
-import kafka.server.{DelayedOperationPurgatory, FetchDataInfo, KafkaConfig, LogOffsetMetadata, MetadataCache}
+import kafka.server.{DelayedOperationPurgatory, FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache}
 import kafka.utils.timer.MockTimer
 import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.{ClientResponse, NetworkClient}
@@ -257,9 +257,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
-      maxOffset = EasyMock.eq(None),
-      minOneMessage = EasyMock.eq(true),
-      includeAbortedTxns = EasyMock.eq(false)))
+      isolation = EasyMock.eq(FetchLogEnd),
+      minOneMessage = EasyMock.eq(true)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
 
     EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 08d0b32..14745e7 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.log.Log
-import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
+import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{MockScheduler, Pool}
 import org.scalatest.Assertions.fail
 import kafka.zk.KafkaZkClient
@@ -584,9 +584,8 @@ class TransactionStateManagerTest {
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
-      maxOffset = EasyMock.eq(None),
-      minOneMessage = EasyMock.eq(true),
-      includeAbortedTxns = EasyMock.eq(false)))
+      isolation = EasyMock.eq(FetchLogEnd),
+      minOneMessage = EasyMock.eq(true)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
 
     EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 557cef3..d7a725b 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
 import org.apache.kafka.common.utils.Utils
 import java.util.{Collection, Properties}
 
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
 
 import scala.collection.JavaConverters._
 
@@ -64,8 +64,10 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
           new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
 
     def readBatch(offset: Int): RecordBatch = {
-      val fetchInfo = log.read(offset, 4096, maxOffset = None,
-        includeAbortedTxns = false, minOneMessage = true)
+      val fetchInfo = log.read(offset,
+        maxLength = 4096,
+        isolation = FetchLogEnd,
+        minOneMessage = true)
       fetchInfo.records.batches.iterator.next()
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index bfbd423..684f0b2 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io._
 import java.util.{Collections, Properties}
 
-import kafka.server.FetchDataInfo
+import kafka.server.{FetchDataInfo, FetchLogEnd}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
@@ -465,7 +465,7 @@ class LogManagerTest {
   }
 
   private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
-    log.read(offset, maxLength, maxOffset = None, minOneMessage = true, includeAbortedTxns = false)
+    log.read(offset, maxLength, isolation = FetchLogEnd, minOneMessage = true)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 0ebc0ef..c3a6899 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -67,7 +67,7 @@ class LogSegmentTest {
   @Test
   def testReadOnEmptySegment() {
     val seg = createSegment(40)
-    val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
+    val read = seg.read(startOffset = 40, maxSize = 300)
     assertNull("Read beyond the last offset in the segment should be null", read)
   }
 
@@ -80,29 +80,11 @@ class LogSegmentTest {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there", "little", "bee")
     seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).records
+    val read = seg.read(startOffset = 41, maxSize = 300).records
     checkEquals(ms.records.iterator, read.records.iterator)
   }
 
   /**
-   * If we set the startOffset and maxOffset for the read to be the same value
-   * we should get only the first message in the log
-   */
-  @Test
-  def testMaxOffset() {
-    val baseOffset = 50
-    val seg = createSegment(baseOffset)
-    val ms = records(baseOffset, "hello", "there", "beautiful")
-    seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    def validate(offset: Long) =
-      assertEquals(ms.records.asScala.filter(_.offset == offset).toList,
-                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).records.records.asScala.toList)
-    validate(50)
-    validate(51)
-    validate(52)
-  }
-
-  /**
    * If we read from an offset beyond the last offset in the segment we should get null
    */
   @Test
@@ -110,7 +92,7 @@ class LogSegmentTest {
     val seg = createSegment(40)
     val ms = records(50, "hello", "there")
     seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
-    val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
+    val read = seg.read(startOffset = 52, maxSize = 200)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
 
@@ -125,7 +107,7 @@ class LogSegmentTest {
     seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
     seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    val read = seg.read(startOffset = 55, maxSize = 200)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
@@ -143,11 +125,11 @@ class LogSegmentTest {
       val ms2 = records(offset + 1, "hello")
       seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
       // check that we can read back both messages
-      val read = seg.read(offset, None, 10000)
+      val read = seg.read(offset, 10000)
       assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList)
       // now truncate off the last message
       seg.truncateTo(offset + 1)
-      val read2 = seg.read(offset, None, 10000)
+      val read2 = seg.read(offset, 10000)
       assertEquals(1, read2.records.records.asScala.size)
       checkEquals(ms1.records.iterator, read2.records.records.iterator)
       offset += 1
@@ -230,7 +212,7 @@ class LogSegmentTest {
     assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP))
     assertFalse(seg.timeIndex.isFull)
     assertFalse(seg.offsetIndex.isFull)
-    assertNull("Segment should be empty.", seg.read(0, None, 1024))
+    assertNull("Segment should be empty.", seg.read(0, 1024))
 
     seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
   }
@@ -299,8 +281,10 @@ class LogSegmentTest {
     val indexFile = seg.lazyOffsetIndex.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(new ProducerStateManager(topicPartition, logDir))
-    for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i + 1), 1024).records.records.iterator.next().offset)
+    for(i <- 0 until 100) {
+      val records = seg.read(i, 1, minOneMessage = true).records.records
+      assertEquals(i, records.iterator.next().offset)
+    }
   }
 
   @Test
@@ -439,7 +423,7 @@ class LogSegmentTest {
     seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
     seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    val read = seg.read(startOffset = 55, maxSize = 200)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
@@ -460,7 +444,7 @@ class LogSegmentTest {
     seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
     val ms2 = records(60, "alpha", "beta")
     seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
-    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    val read = seg.read(startOffset = 55, maxSize = 200)
     checkEquals(ms2.records.iterator, read.records.records.iterator)
     val oldSize = seg.log.sizeInBytes()
     val oldPosition = seg.log.channel.position
@@ -474,7 +458,7 @@ class LogSegmentTest {
       initFileSize = 512 * 1024 * 1024, preallocate = true)
     segments += segReopen
 
-    val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    val readAgain = segReopen.read(startOffset = 55, maxSize = 200)
     checkEquals(ms2.records.iterator, readAgain.records.records.iterator)
     val size = segReopen.log.sizeInBytes()
     val position = segReopen.log.channel.position
@@ -503,7 +487,7 @@ class LogSegmentTest {
     seg.truncateTo(offset + 1)
 
     //Then we should still truncate the record that was present (i.e. offset + 3 is gone)
-    val log = seg.read(offset, None, 10000)
+    val log = seg.read(offset, 10000)
     assertEquals(offset, log.records.batches.iterator.next().baseOffset())
     assertEquals(1, log.records.batches.asScala.size)
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 46494e7..fa7be7e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -28,7 +28,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
-import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
@@ -77,7 +77,7 @@ class LogTest {
 
   @Test
   def testHighWatermarkMaintenance(): Unit = {
-    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
     val log = createLog(logDir, logConfig)
 
     val records = TestUtils.records(List(
@@ -116,6 +116,142 @@ class LogTest {
     assertHighWatermark(3L)
   }
 
+  private def assertNonEmptyFetch(log: Log, offset: Long, isolation: FetchIsolation): Unit = {
+    val readInfo = log.read(startOffset = offset,
+      maxLength = Int.MaxValue,
+      isolation = isolation,
+      minOneMessage = true)
+
+    assertFalse(readInfo.firstEntryIncomplete)
+    assertTrue(readInfo.records.sizeInBytes > 0)
+
+    val upperBoundOffset = isolation match {
+      case FetchLogEnd => log.logEndOffset
+      case FetchHighWatermark => log.highWatermark
+      case FetchTxnCommitted => log.lastStableOffset
+    }
+
+    for (record <- readInfo.records.records.asScala)
+      assertTrue(record.offset < upperBoundOffset)
+
+    assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
+    assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
+  }
+
+  private def assertEmptyFetch(log: Log, offset: Long, isolation: FetchIsolation): Unit = {
+    val readInfo = log.read(startOffset = offset,
+      maxLength = Int.MaxValue,
+      isolation = isolation,
+      minOneMessage = true)
+    assertFalse(readInfo.firstEntryIncomplete)
+    assertEquals(0, readInfo.records.sizeInBytes)
+    assertEquals(offset, readInfo.fetchOffsetMetadata.messageOffset)
+    assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
+  }
+
+  @Test
+  def testFetchUpToLogEndOffset(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("0".getBytes),
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )), leaderEpoch = 0)
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("3".getBytes),
+      new SimpleRecord("4".getBytes)
+    )), leaderEpoch = 0)
+
+    (log.logStartOffset until log.logEndOffset).foreach { offset =>
+      assertNonEmptyFetch(log, offset, FetchLogEnd)
+    }
+  }
+
+  @Test
+  def testFetchUpToHighWatermark(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("0".getBytes),
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )), leaderEpoch = 0)
+    log.appendAsLeader(TestUtils.records(List(
+      new SimpleRecord("3".getBytes),
+      new SimpleRecord("4".getBytes)
+    )), leaderEpoch = 0)
+
+    def assertHighWatermarkBoundedFetches(): Unit = {
+      (log.logStartOffset until log.highWatermark).foreach { offset =>
+        assertNonEmptyFetch(log, offset, FetchHighWatermark)
+      }
+
+      (log.highWatermark to log.logEndOffset).foreach { offset =>
+        assertEmptyFetch(log, offset, FetchHighWatermark)
+      }
+    }
+
+    assertHighWatermarkBoundedFetches()
+
+    log.updateHighWatermark(3L)
+    assertHighWatermarkBoundedFetches()
+
+    log.updateHighWatermark(5L)
+    assertHighWatermarkBoundedFetches()
+  }
+
+  @Test
+  def testFetchUpToLastStableOffset(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    val epoch = 0.toShort
+
+    val producerId1 = 1L
+    val producerId2 = 2L
+
+    val appendProducer1 = appendTransactionalAsLeader(log, producerId1, epoch)
+    val appendProducer2 = appendTransactionalAsLeader(log, producerId2, epoch)
+
+    appendProducer1(5)
+    appendNonTransactionalAsLeader(log, 3)
+    appendProducer2(2)
+    appendProducer1(4)
+    appendNonTransactionalAsLeader(log, 2)
+    appendProducer1(10)
+
+    def assertLsoBoundedFetches(): Unit = {
+      (log.logStartOffset until log.lastStableOffset).foreach { offset =>
+        assertNonEmptyFetch(log, offset, FetchTxnCommitted)
+      }
+
+      (log.lastStableOffset to log.logEndOffset).foreach { offset =>
+        assertEmptyFetch(log, offset, FetchTxnCommitted)
+      }
+    }
+
+    assertLsoBoundedFetches()
+
+    log.updateHighWatermark(log.logEndOffset)
+    assertLsoBoundedFetches()
+
+    appendEndTxnMarkerAsLeader(log, producerId1, epoch, ControlRecordType.COMMIT)
+    assertEquals(0L, log.lastStableOffset)
+
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(8L, log.lastStableOffset)
+    assertLsoBoundedFetches()
+
+    appendEndTxnMarkerAsLeader(log, producerId2, epoch, ControlRecordType.ABORT)
+    assertEquals(8L, log.lastStableOffset)
+
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(log.logEndOffset, log.lastStableOffset)
+    assertLsoBoundedFetches()
+  }
+
   @Test
   def testLogDeleteDirName(): Unit = {
     val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3))
@@ -228,8 +364,8 @@ class LogTest {
     log.appendAsFollower(records2)
 
     assertEquals("Expect two records in the log", 2, log.logEndOffset)
-    assertEquals(0, readLog(log, 0, 100, Some(1)).records.batches.iterator.next().lastOffset)
-    assertEquals(1, readLog(log, 1, 100, Some(2)).records.batches.iterator.next().lastOffset)
+    assertEquals(0, readLog(log, 0, 1).records.batches.iterator.next().lastOffset)
+    assertEquals(1, readLog(log, 1, 1).records.batches.iterator.next().lastOffset)
 
     // roll so that active segment is empty
     log.roll()
@@ -243,7 +379,7 @@ class LogTest {
       baseOffset = 2L, partitionLeaderEpoch = 0)
     log.appendAsFollower(records3)
     assertTrue(log.activeSegment.offsetIndex.maxEntries > 1)
-    assertEquals(2, readLog(log, 2, 100, Some(3)).records.batches.iterator.next().lastOffset)
+    assertEquals(2, readLog(log, 2, 1).records.batches.iterator.next().lastOffset)
     assertEquals("Expect two segments.", 2, log.numberOfSegments)
   }
 
@@ -449,10 +585,9 @@ class LogTest {
           val wrapper = new LogSegment(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, segment.txnIndex, segment.baseOffset,
             segment.indexIntervalBytes, segment.rollJitterMs, mockTime) {
 
-            override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long,
-                              minOneMessage: Boolean): FetchDataInfo = {
+            override def read(startOffset: Long, maxSize: Int, maxPosition: Long, minOneMessage: Boolean): FetchDataInfo = {
               segmentsWithReads += this
-              super.read(startOffset, maxOffset, maxSize, maxPosition, minOneMessage)
+              super.read(startOffset, maxSize, maxPosition, minOneMessage)
             }
 
             override def recover(producerStateManager: ProducerStateManager,
@@ -1392,7 +1527,7 @@ class LogTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0)
 
     for(i <- values.indices) {
-      val read = readLog(log, i, 100, Some(i+1)).records.batches.iterator.next()
+      val read = readLog(log, i, 1).records.batches.iterator.next()
       assertEquals("Offset read should match order appended.", i, read.lastOffset)
       val actual = read.iterator.next()
       assertNull("Key should be null", actual.key)
@@ -1469,16 +1604,13 @@ class LogTest {
       val idx = messageIds.indexWhere(_ >= i)
       val reads = Seq(
         readLog(log, i, 1),
-        readLog(log, i, 100),
-        readLog(log, i, 100, Some(10000))
+        readLog(log, i, 100000),
+        readLog(log, i, 100)
       ).map(_.records.records.iterator.next())
       reads.foreach { read =>
         assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
         assertEquals("Message should match appended.", records(idx), new SimpleRecord(read))
       }
-
-      val fetchedData = readLog(log, i, 1, Some(1))
-      assertEquals(Seq.empty, fetchedData.records.batches.asScala.toIndexedSeq)
     }
   }
 
@@ -1538,9 +1670,6 @@ class LogTest {
     } catch {
       case _: OffsetOutOfRangeException => // This is good.
     }
-
-    assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0,
-      readLog(log, 1025, 1000, Some(1024)).records.sizeInBytes)
   }
 
   /**
@@ -1572,8 +1701,7 @@ class LogTest {
       assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp)
       offset = head.lastOffset + 1
     }
-    val lastRead = readLog(log, startOffset = numMessages, maxLength = 1024*1024,
-      maxOffset = Some(numMessages + 1)).records
+    val lastRead = readLog(log, startOffset = numMessages, maxLength = 1024*1024).records
     assertEquals("Should be no more messages", 0, lastRead.records.asScala.size)
 
     // check that rolling the log forced a flushed, the flush is async so retry in case of failure
@@ -2886,7 +3014,7 @@ class LogTest {
       log.deleteOldSegments()
       assertTrue(log.logStartOffset <= hw)
       log.logSegments.foreach { segment =>
-        val segmentFetchInfo = segment.read(startOffset = segment.baseOffset, maxOffset = None, maxSize = Int.MaxValue)
+        val segmentFetchInfo = segment.read(startOffset = segment.baseOffset, maxSize = Int.MaxValue)
         val segmentLastOffsetOpt = segmentFetchInfo.records.records.asScala.lastOption.map(_.offset)
         segmentLastOffsetOpt.foreach { lastOffset =>
           assertTrue(lastOffset >= hw)
@@ -3099,7 +3227,7 @@ class LogTest {
 
     //Then leader epoch should be set on messages
     for (i <- records.indices) {
-      val read = readLog(log, i, 100, Some(i+1)).records.batches.iterator.next()
+      val read = readLog(log, i, 1).records.batches.iterator.next()
       assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch)
     }
   }
@@ -3613,12 +3741,25 @@ class LogTest {
   }
 
   private def assertValidLogOffsetMetadata(log: Log, offsetMetadata: LogOffsetMetadata): Unit = {
-    val readInfo = log.read(startOffset = offsetMetadata.messageOffset,
-      maxLength = 2048,
-      maxOffset = None,
-      minOneMessage = false,
-      includeAbortedTxns = false)
-    assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata)
+    assertFalse(offsetMetadata.messageOffsetOnly)
+
+    val segmentBaseOffset = offsetMetadata.segmentBaseOffset
+    val segmentOpt = log.logSegments(segmentBaseOffset, segmentBaseOffset + 1).headOption
+    assertTrue(segmentOpt.isDefined)
+
+    val segment = segmentOpt.get
+    assertEquals(segmentBaseOffset, segment.baseOffset)
+    assertTrue(offsetMetadata.relativePositionInSegment <= segment.size)
+
+    val readInfo = segment.read(offsetMetadata.messageOffset,
+      maxSize = 2048,
+      maxPosition = segment.size,
+      minOneMessage = false)
+
+    if (offsetMetadata.relativePositionInSegment < segment.size)
+      assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata)
+    else
+      assertNull(readInfo)
   }
 
   @Test(expected = classOf[TransactionCoordinatorFencedException])
@@ -3863,7 +4004,10 @@ class LogTest {
     assertEquals(None, log.firstUnstableOffset)
 
     // now check that a fetch includes the aborted transaction
-    val fetchDataInfo = log.read(0L, 2048, maxOffset = None, minOneMessage = true, includeAbortedTxns = true)
+    val fetchDataInfo = log.read(0L,
+      maxLength = 2048,
+      isolation = FetchTxnCommitted,
+      minOneMessage = true)
     assertEquals(1, fetchDataInfo.abortedTransactions.size)
 
     assertTrue(fetchDataInfo.abortedTransactions.isDefined)
@@ -4003,10 +4147,12 @@ class LogTest {
       expectDeletedFiles)
   }
 
-  private def readLog(log: Log, startOffset: Long, maxLength: Int,
-                      maxOffset: Option[Long] = None,
+  private def readLog(log: Log,
+                      startOffset: Long,
+                      maxLength: Int,
+                      isolation: FetchIsolation = FetchLogEnd,
                       minOneMessage: Boolean = true): FetchDataInfo = {
-    log.read(startOffset, maxLength, maxOffset, minOneMessage, includeAbortedTxns = false)
+    log.read(startOffset, maxLength, isolation, minOneMessage)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 7637bfc..e0dbf8c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -212,9 +212,8 @@ class ReplicaManagerQuotasTest {
     //if we ask for len 1 return a message
     expect(log.read(anyObject(),
       maxLength = geq(1),
-      maxOffset = anyObject(),
-      minOneMessage = anyBoolean(),
-      includeAbortedTxns = EasyMock.eq(false))).andReturn(
+      isolation = anyObject(),
+      minOneMessage = anyBoolean())).andReturn(
       FetchDataInfo(
         LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, record)
@@ -223,9 +222,8 @@ class ReplicaManagerQuotasTest {
     //if we ask for len = 0, return 0 messages
     expect(log.read(anyObject(),
       maxLength = EasyMock.eq(0),
-      maxOffset = anyObject(),
-      minOneMessage = anyBoolean(),
-      includeAbortedTxns = EasyMock.eq(false))).andReturn(
+      isolation = anyObject(),
+      minOneMessage = anyBoolean())).andReturn(
       FetchDataInfo(
         LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.EMPTY
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1aec9d8..5e9c482 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -90,9 +90,8 @@ class SimpleFetchTest {
     EasyMock.expect(log.read(
       startOffset = 0,
       maxLength = fetchSize,
-      maxOffset = Some(partitionHW),
-      minOneMessage = true,
-      includeAbortedTxns = false))
+      isolation = FetchHighWatermark,
+      minOneMessage = true))
       .andReturn(FetchDataInfo(
         LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
@@ -100,9 +99,8 @@ class SimpleFetchTest {
     EasyMock.expect(log.read(
       startOffset = 0,
       maxLength = fetchSize,
-      maxOffset = None,
-      minOneMessage = true,
-      includeAbortedTxns = false))
+      isolation = FetchLogEnd,
+      minOneMessage = true))
       .andReturn(FetchDataInfo(
         LogOffsetMetadata(0L, 0L, 0),
         MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 01ba69f..84de813 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -367,7 +367,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     printSegments()
 
     def crcSeq(broker: KafkaServer, partition: Int = 0): Seq[Long] = {
-      val batches = getLog(broker, partition).activeSegment.read(0, None, Integer.MAX_VALUE)
+      val batches = getLog(broker, partition).activeSegment.read(0, Integer.MAX_VALUE)
         .records.batches().asScala.toSeq
       batches.map(_.checksum)
     }
@@ -438,7 +438,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   private def epochCache(broker: KafkaServer): LeaderEpochFileCache = getLog(broker, 0).leaderEpochCache.get
 
   private def latestRecord(leader: KafkaServer, offset: Int = -1, partition: Int = 0): RecordBatch = {
-    getLog(leader, partition).activeSegment.read(0, None, Integer.MAX_VALUE)
+    getLog(leader, partition).activeSegment.read(0, Integer.MAX_VALUE)
       .records.batches().asScala.toSeq.last
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index e6f9156..bdd853c 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -245,10 +245,10 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
       val leo = broker.getLogManager().getLog(tp).get.logEndOffset
       result = result && leo > 0 && brokers.forall { broker =>
         broker.getLogManager().getLog(tp).get.logSegments.iterator.forall { segment =>
-          if (segment.read(minOffset, None, Integer.MAX_VALUE) == null) {
+          if (segment.read(minOffset, Integer.MAX_VALUE) == null) {
             false
           } else {
-            segment.read(minOffset, None, Integer.MAX_VALUE)
+            segment.read(minOffset, Integer.MAX_VALUE)
               .records.batches().iterator().asScala.forall(
               expectedLeaderEpoch == _.partitionLeaderEpoch()
             )


Mime
View raw message