kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Refactor high watermark access and validation (#7055)
Date Thu, 11 Jul 2019 20:52:24 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a7d4cb  MINOR: Refactor high watermark access and validation (#7055)
7a7d4cb is described below

commit 7a7d4cb5819b169db2f40f0ca2efc04043ff10df
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Jul 11 13:52:04 2019 -0700

    MINOR: Refactor high watermark access and validation (#7055)
    
    The purpose of this patch is to restrict the paths for updating and accessing the high watermark and the last stable offset. By doing so, we can validate that these offsets always remain within the range of the log. We also ensure that we never expose `LogOffsetMetadata` unless it is fully materialized. Finally, this patch makes a few naming changes. In particular, we remove the `highWatermark_=` and `highWatermarkMetadata_=` which are both misleading and cumbersome to test.
    
    Reviewers: David Arthur <mumrah@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  48 ++---
 core/src/main/scala/kafka/log/Log.scala            | 203 +++++++++++++--------
 .../main/scala/kafka/log/LogCleanerManager.scala   |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |   3 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   4 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   9 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   2 +-
 .../scala/unit/kafka/cluster/ReplicaTest.scala     |  12 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |   8 +-
 .../LogCleanerParameterizedIntegrationTest.scala   |   4 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   6 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   8 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 175 ++++++++++--------
 .../server/HighwatermarkPersistenceTest.scala      |  20 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   3 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |   2 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |   5 +-
 18 files changed, 290 insertions(+), 226 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1965d7f..f18ddd2 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -318,9 +318,7 @@ class Partition(val topicPartition: TopicPartition,
       info(s"No checkpointed highwatermark is found for partition $topicPartition")
       0L
     }
-    val initialHighWatermark = math.min(checkpointHighWatermark, log.logEndOffset)
-    log.highWatermarkMetadata = LogOffsetMetadata(initialHighWatermark)
-
+    val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
     info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
     log
   }
@@ -518,8 +516,6 @@ class Partition(val topicPartition: TopicPartition,
       }
 
       if (isNewLeader) {
-        // construct the high watermark metadata for the new leader replica
-        leaderLog.initializeHighWatermarkOffsetMetadata()
         // mark local replica as the leader after converting hw
         leaderReplicaIdOpt = Some(localBrokerId)
         // reset log end offset for remote replicas
@@ -757,25 +753,21 @@ class Partition(val topicPartition: TopicPartition,
       curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicaIds.contains(replica.brokerId)
     }.map(_.logEndOffsetMetadata)
     val newHighWatermark = (replicaLogEndOffsets + leaderLog.logEndOffsetMetadata).min(new LogOffsetMetadata.OffsetOrdering)
-    val oldHighWatermark = leaderLog.highWatermarkMetadata
-
-    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
-    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
-    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
-      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
-      leaderLog.highWatermarkMetadata = newHighWatermark
-      debug(s"High watermark updated to $newHighWatermark")
-      true
-    } else {
-      def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
-        case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
-      }
+    leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
+      case Some(oldHighWatermark) =>
+        debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
+        true
+
+      case None =>
+        def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
+          case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
+        }
 
-      val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata))
-      val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
-      trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +
-        s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
-      false
+        val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata))
+        val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
+        trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " +
+          s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
+        false
     }
   }
 
@@ -1034,15 +1026,7 @@ class Partition(val topicPartition: TopicPartition,
                           fetchOnlyFromLeader: Boolean): LogOffsetSnapshot = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
     val localLog = localLogWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
-    localLog.offsetSnapshot
-  }
-
-  def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer],
-                                 fetchOnlyFromLeader: Boolean): Either[LogOffsetSnapshot, Errors] = {
-    inReadLock(leaderIsrUpdateLock) {
-      getLocalLog(currentLeaderEpoch, fetchOnlyFromLeader)
-        .left.map(_.offsetSnapshot)
-    }
+    localLog.fetchOffsetSnapshot
   }
 
   def legacyFetchOffsetsForTimestamp(timestamp: Long,
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 31412bd..b138239 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -259,14 +259,14 @@ class Log(@volatile var dir: File,
    * that this could result in disagreement between replicas depending on when they began replicating the log.
    * In the worst case, the LSO could be seen by a consumer to go backwards.
    */
-  @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
+  @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None
 
   /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
    * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
    * equals the log end offset (which may never happen for a partition under consistent load). This is needed to
    * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
    */
-  @volatile private[this] var _highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(0)
+  @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
 
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@@ -306,57 +306,126 @@ class Log(@volatile var dir: File,
 
   def highWatermark: Long = highWatermarkMetadata.messageOffset
 
-  def highWatermark_=(newHighWatermark: Long): Unit = {
-    highWatermarkMetadata = LogOffsetMetadata(newHighWatermark)
+  /**
+   * Update the high watermark to a new offset. The new high watermark will be lower
+   * bounded by the log start offset and upper bounded by the log end offset.
+   *
+   * This is intended to be called when initializing the high watermark or when updating
+   * it on a follower after receiving a Fetch response from the leader.
+   *
+   * @param hw the suggested new value for the high watermark
+   * @return the updated high watermark offset
+   */
+  def updateHighWatermark(hw: Long): Long = {
+    val newHighWatermark = if (hw < logStartOffset)
+      logStartOffset
+    else if (hw > logEndOffset)
+      logEndOffset
+    else
+      hw
+    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
+    newHighWatermark
   }
 
-  def highWatermarkMetadata: LogOffsetMetadata = _highWatermarkMetadata
+  /**
+   * Update the high watermark to a new value if and only if it is larger than the old value. It is
+   * an error to update to a value which is larger than the log end offset.
+   *
+   * This method is intended to be used by the leader to update the high watermark after follower
+   * fetch offsets have been updated.
+   *
+   * @return the old high watermark, if updated by the new value
+   */
+  def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
+    if (newHighWatermark.messageOffset > logEndOffset)
+      throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
+        s"log end offset $logEndOffsetMetadata")
+
+    val oldHighWatermark = fetchHighWatermarkMetadata
+
+    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
+    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
+    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
+      updateHighWatermarkMetadata(newHighWatermark)
+      Some(oldHighWatermark)
+    } else {
+      None
+    }
+  }
 
-  def highWatermarkMetadata_=(newHighWatermark: LogOffsetMetadata) {
+  /**
+   * Get the offset and metadata for the current high watermark. If offset metadata is not
+   * known, this will do a lookup in the index and cache the result.
+   */
+  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
+    checkIfMemoryMappedBufferClosed()
+
+    val offsetMetadata = highWatermarkMetadata
+    if (offsetMetadata.messageOffsetOnly) {
+      lock.synchronized {
+        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
+        updateHighWatermarkMetadata(fullOffset)
+        fullOffset
+      }
+    } else {
+      offsetMetadata
+    }
+  }
+
+  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
     if (newHighWatermark.messageOffset < 0)
       throw new IllegalArgumentException("High watermark offset should be non-negative")
 
     lock synchronized {
-      _highWatermarkMetadata = newHighWatermark
+      highWatermarkMetadata = newHighWatermark
       producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
-      updateFirstUnstableOffset()
+      maybeIncrementFirstUnstableOffset()
     }
-    trace(s"Setting high watermark [$newHighWatermark]")
+    trace(s"Setting high watermark $newHighWatermark")
   }
 
-  /*
-   * Convert hw to local offset metadata by reading the log at the hw offset.
-   * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
+  /**
+   * Get the first unstable offset. Unlike the last stable offset, which is always defined,
+   * the first unstable offset only exists if there are transactions in progress.
+   *
+   * @return the first unstable offset, if it exists
    */
-  def initializeHighWatermarkOffsetMetadata(): Unit = {
-    if (highWatermarkMetadata.messageOffsetOnly) {
-      lock.synchronized {
-        highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
-          convertToOffsetMetadata(logStartOffset).getOrElse {
-            val firstSegmentOffset = logSegments.head.baseOffset
-            LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
+  private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset)
+
+  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
+    checkIfMemoryMappedBufferClosed()
+
+    firstUnstableOffsetMetadata match {
+      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark =>
+        if (offsetMetadata.messageOffsetOnly) {
+          lock synchronized {
+            val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
+            if (firstUnstableOffsetMetadata.contains(offsetMetadata))
+              firstUnstableOffsetMetadata = Some(fullOffset)
+            fullOffset
           }
+        } else {
+          offsetMetadata
         }
-      }
+      case _ => fetchHighWatermarkMetadata
     }
   }
 
   /**
-    * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
-    * Non-transactional messages are considered decided immediately, but transactional messages are only decided when
-    * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
-    * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
-    * beyond the high watermark.
-    */
-  def lastStableOffsetMetadata: LogOffsetMetadata = {
-    firstUnstableOffset match {
-      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata
-      case _ => highWatermarkMetadata
+   * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
+   * Non-transactional messages are considered decided immediately, but transactional messages are only decided when
+   * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
+   * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
+   * beyond the high watermark.
+   */
+  def lastStableOffset: Long = {
+    firstUnstableOffsetMetadata match {
+      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata.messageOffset
+      case _ => highWatermark
     }
   }
 
-  def lastStableOffset: Long = lastStableOffsetMetadata.messageOffset
-
   def lastStableOffsetLag: Long = highWatermark - lastStableOffset
 
   /**
@@ -364,28 +433,9 @@ class Log(@volatile var dir: File,
     * the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an
     * offset out of range error if the segment info cannot be loaded.
     */
-  def offsetSnapshot: LogOffsetSnapshot = {
-    var highWatermark = _highWatermarkMetadata
-    if (highWatermark.messageOffsetOnly) {
-      lock.synchronized {
-        val fullOffset = convertToOffsetMetadataOrThrow(_highWatermarkMetadata.messageOffset)
-        _highWatermarkMetadata = fullOffset
-        highWatermark = _highWatermarkMetadata
-      }
-    }
-
-    var lastStable: LogOffsetMetadata = lastStableOffsetMetadata
-    if (lastStable.messageOffsetOnly) {
-      lock synchronized {
-        firstUnstableOffset match {
-          case None => highWatermark
-          case Some(offsetMetadata) =>
-            val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
-            firstUnstableOffset = Some(fullOffset)
-            lastStable = fullOffset
-        }
-      }
-    }
+  def fetchOffsetSnapshot: LogOffsetSnapshot = {
+    val lastStable = fetchLastStableOffsetMetadata
+    val highWatermark = fetchHighWatermarkMetadata
 
     LogOffsetSnapshot(
       logStartOffset,
@@ -678,6 +728,12 @@ class Log(@volatile var dir: File,
 
   private def updateLogEndOffset(messageOffset: Long) {
     nextOffsetMetadata = LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
+
+    // Update the high watermark in case it has gotten ahead of the log end offset
+    // following a truncation.
+    if (highWatermark > messageOffset) {
+      updateHighWatermarkMetadata(nextOffsetMetadata)
+    }
   }
 
   /**
@@ -804,7 +860,7 @@ class Log(@volatile var dir: File,
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
     rebuildProducerState(lastOffset, reloadFromCleanShutdown, producerStateManager)
-    updateFirstUnstableOffset()
+    maybeIncrementFirstUnstableOffset()
   }
 
   private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = {
@@ -1098,7 +1154,7 @@ class Log(@volatile var dir: File,
         producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
 
         // update the first unstable offset (which is used to compute LSO)
-        updateFirstUnstableOffset()
+        maybeIncrementFirstUnstableOffset()
 
         trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
           s"first offset: ${appendInfo.firstOffset}, " +
@@ -1131,20 +1187,19 @@ class Log(@volatile var dir: File,
     }
   }
 
-  private def updateFirstUnstableOffset(): Unit = lock synchronized {
+  private def maybeIncrementFirstUnstableOffset(): Unit = lock synchronized {
     checkIfMemoryMappedBufferClosed()
+
     val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
       case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
         val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
-        val segment = segments.floorEntry(offset).getValue
-        val position  = segment.translateOffset(offset)
-        Some(LogOffsetMetadata(offset, segment.baseOffset, position.position))
+        Some(convertToOffsetMetadataOrThrow(offset))
       case other => other
     }
 
-    if (updatedFirstStableOffset != this.firstUnstableOffset) {
+    if (updatedFirstStableOffset != this.firstUnstableOffsetMetadata) {
       debug(s"First unstable offset updated to $updatedFirstStableOffset")
-      this.firstUnstableOffset = updatedFirstStableOffset
+      this.firstUnstableOffsetMetadata = updatedFirstStableOffset
     }
   }
 
@@ -1154,7 +1209,7 @@ class Log(@volatile var dir: File,
   def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
     if (newLogStartOffset > highWatermark)
       throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
-        s"since it is larger than the high watermark ${highWatermark}")
+        s"since it is larger than the high watermark $highWatermark")
 
     // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
     // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
@@ -1167,7 +1222,7 @@ class Log(@volatile var dir: File,
           logStartOffset = newLogStartOffset
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
           producerStateManager.truncateHead(logStartOffset)
-          updateFirstUnstableOffset()
+          maybeIncrementFirstUnstableOffset()
         }
       }
     }
@@ -1559,22 +1614,10 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Given a message offset, find its corresponding offset metadata in the log.
-   * If the message offset is out of range, return None to the caller.
-   */
-  def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
-    try {
-      Some(convertToOffsetMetadataOrThrow(offset))
-    } catch {
-      case _: OffsetOutOfRangeException => None
-    }
-  }
-
-  /**
     * Given a message offset, find its corresponding offset metadata in the log.
     * If the message offset is out of range, throw an OffsetOutOfRangeException
     */
-  def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
+  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
     val fetchDataInfo = read(offset,
       maxLength = 1,
       maxOffset = None,
@@ -2006,7 +2049,7 @@ class Log(@volatile var dir: File,
 
         producerStateManager.truncate()
         producerStateManager.updateMapEndOffset(newOffset)
-        updateFirstUnstableOffset()
+        maybeIncrementFirstUnstableOffset()
 
         this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
         this.logStartOffset = newOffset
@@ -2047,8 +2090,8 @@ class Log(@volatile var dir: File,
     logString.append(s"Log(dir=$dir")
     logString.append(s", topic=${topicPartition.topic}")
     logString.append(s", partition=${topicPartition.partition}")
-    logString.append(s", highWatermark=$highWatermarkMetadata")
-    logString.append(s", lastStableOffset=$lastStableOffsetMetadata")
+    logString.append(s", highWatermark=$highWatermark")
+    logString.append(s", lastStableOffset=$lastStableOffset")
     logString.append(s", logStartOffset=$logStartOffset")
     logString.append(s", logEndOffset=$logEndOffset")
     logString.append(")")
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ed3d526..2397e02 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -544,7 +544,7 @@ private[log] object LogCleanerManager extends Logging {
     val firstUncleanableDirtyOffset: Long = Seq(
 
       // we do not clean beyond the first unstable offset
-      log.firstUnstableOffset.map(_.messageOffset),
+      log.firstUnstableOffset,
 
       // the active segment is always uncleanable
       Option(log.activeSegment.baseOffset),
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 61b8231..320f346 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -803,7 +803,8 @@ class LogManager(logDirs: Seq[File],
         throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
 
       destLog.renameDir(Log.logDirName(topicPartition))
-      destLog.highWatermarkMetadata = sourceLog.highWatermarkMetadata
+      destLog.updateHighWatermark(sourceLog.highWatermark)
+
       // Now that future replica has been successfully renamed to be the current replica
       // Update the cached map and log cleaner as appropriate.
       futureLogs.remove(topicPartition)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 46e90fc..24dbb3d 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchResponse.PartitionData
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, Set, mutable}
@@ -111,7 +111,7 @@ class ReplicaAlterLogDirsThread(name: String,
         topicPartition, fetchOffset, futureLog.logEndOffset))
 
     val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
-    futureLog.highWatermark = futureLog.logEndOffset.min(partitionData.highWatermark)
+    futureLog.updateHighWatermark(partitionData.highWatermark)
     futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset)
 
     if (partition.maybeReplaceCurrentWithFutureReplica())
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ccf9bc1..1d2fdee 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -163,12 +163,11 @@ class ReplicaFetcherThread(name: String,
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
         .format(log.logEndOffset, records.sizeInBytes, topicPartition))
-    val followerHighWatermark = log.logEndOffset.min(partitionData.highWatermark)
     val leaderLogStartOffset = partitionData.logStartOffset
-    // for the follower replica, we do not need to keep
-    // its segment base offset the physical position,
-    // these values will be computed upon making the leader
-    log.highWatermark = followerHighWatermark
+
+    // For the follower replica, we do not need to keep its segment base offset and physical position.
+    // These values will be computed upon becoming leader or handling a preferred read replica fetch.
+    val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
     log.maybeIncrementLogStartOffset(leaderLogStartOffset)
     if (isTraceEnabled)
       trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c71e29c..aba5aca 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -957,7 +957,7 @@ class ReplicaManager(val config: KafkaConfig,
               s"${preferredReadReplica.get} for $clientMetadata")
           }
           // If a preferred read-replica is set, skip the read
-          val offsetSnapshot: LogOffsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false)
+          val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             highWatermark = offsetSnapshot.highWatermark.messageOffset,
             leaderLogStartOffset = offsetSnapshot.logStartOffset,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 33c0d95..aeb9768 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -743,7 +743,7 @@ class PartitionTest {
     assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
     assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
 
-    partition.log.get.highWatermark = 1L
+    partition.log.get.updateHighWatermark(1L)
 
     assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
     assertEquals(1L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 31192f0..d6f1094 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -62,17 +62,15 @@ class ReplicaTest {
 
   @Test
   def testSegmentDeletionWithHighWatermarkInitialization(): Unit = {
-    val initialHighWatermark = 25L
-    log.highWatermark = initialHighWatermark
-
-    assertEquals(initialHighWatermark, log.highWatermark)
-
     val expiredTimestamp = time.milliseconds() - 1000
     for (i <- 0 until 100) {
       val records = TestUtils.singletonRecords(value = s"test$i".getBytes, timestamp = expiredTimestamp)
       log.appendAsLeader(records, leaderEpoch = 0)
     }
 
+    val initialHighWatermark = log.updateHighWatermark(25L)
+    assertEquals(25L, initialHighWatermark)
+
     val initialNumSegments = log.numberOfSegments
     log.deleteOldSegments()
     assertTrue(log.numberOfSegments < initialNumSegments)
@@ -94,7 +92,7 @@ class ReplicaTest {
     assertEquals(100L, log.logEndOffset)
 
     for (hw <- 0 to 100) {
-      log.highWatermark = hw
+      log.updateHighWatermark(hw)
       assertEquals(hw, log.highWatermark)
       log.deleteOldSegments()
       assertTrue(log.logStartOffset <= hw)
@@ -123,7 +121,7 @@ class ReplicaTest {
       log.appendAsLeader(records, leaderEpoch = 0)
     }
 
-    log.highWatermark = 25L
+    log.updateHighWatermark(25L)
     log.maybeIncrementLogStartOffset(26L)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 15b47ca..3935338 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -218,7 +218,7 @@ class LogCleanerManagerTest extends Logging {
     log.appendAsLeader(records, leaderEpoch = 0)
     log.roll()
     log.appendAsLeader(records, leaderEpoch = 0)
-    log.highWatermarkMetadata = LogOffsetMetadata(2L)
+    log.updateHighWatermark(2L)
 
     // simulate cleanup thread working on the log partition
     val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
@@ -403,7 +403,7 @@ class LogCleanerManagerTest extends Logging {
     log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 2,
       new SimpleRecord(time.milliseconds(), "3".getBytes, "c".getBytes)), leaderEpoch = 0)
     log.roll()
-    log.highWatermarkMetadata = LogOffsetMetadata(3L)
+    log.updateHighWatermark(3L)
 
     time.sleep(compactionLag + 1)
     // although the compaction lag has been exceeded, the undecided data should not be cleaned
@@ -415,7 +415,7 @@ class LogCleanerManagerTest extends Logging {
     log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId, producerEpoch,
       new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0, isFromClient = false)
     log.roll()
-    log.highWatermarkMetadata = LogOffsetMetadata(4L)
+    log.updateHighWatermark(4L)
 
     // the first segment should now become cleanable immediately
     cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 5282e6d..4074191 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import kafka.api.KAFKA_0_11_0_IV0
 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
-import kafka.server.{KafkaConfig, LogOffsetMetadata}
+import kafka.server.KafkaConfig
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -101,7 +101,7 @@ class LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
       val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
       val startSize = log.size
 
-      log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
+      log.updateHighWatermark(log.logEndOffset)
 
       val firstDirty = log.activeSegment.baseOffset
       cleaner.startup()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 984509c..40ae5a8 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -24,7 +24,7 @@ import java.util.Properties
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -126,9 +126,9 @@ class LogCleanerTest {
     val t = new Thread() {
       override def run(): Unit = {
         deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
-        log.highWatermark = log.activeSegment.baseOffset
+        log.updateHighWatermark(log.activeSegment.baseOffset)
         log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset)
-        log.highWatermarkMetadata = LogOffsetMetadata(log.activeSegment.baseOffset)
+        log.updateHighWatermark(log.activeSegment.baseOffset)
         log.deleteOldSegments()
         deleteCompleteLatch.countDown()
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 94ae3ed..1e6d2dc 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io._
 import java.util.{Collections, Properties}
 
-import kafka.server.{FetchDataInfo, LogOffsetMetadata}
+import kafka.server.FetchDataInfo
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -119,8 +119,7 @@ class LogManagerTest {
       offset = info.lastOffset
     }
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
-    log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
 
     log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
 
@@ -174,8 +173,7 @@ class LogManagerTest {
       offset = info.firstOffset.get
     }
 
-    log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.segmentSize, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c584fc0..6bb9301 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -76,6 +76,47 @@ class LogTest {
   }
 
   @Test
+  def testHighWatermarkMaintenance(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L)
+    val log = createLog(logDir, logConfig)
+
+    val records = TestUtils.records(List(
+      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
+      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
+      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
+    ))
+
+    def assertHighWatermark(offset: Long): Unit = {
+      assertEquals(offset, log.highWatermark)
+      assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+    }
+
+    // High watermark initialized to 0
+    assertHighWatermark(0L)
+
+    // High watermark not changed by append
+    log.appendAsLeader(records, leaderEpoch = 0)
+    assertHighWatermark(0L)
+
+    // Update high watermark as leader
+    log.maybeIncrementHighWatermark(LogOffsetMetadata(1L))
+    assertHighWatermark(1L)
+
+    // Cannot update past the log end offset
+    log.updateHighWatermark(5L)
+    assertHighWatermark(3L)
+
+    // Update high watermark as follower
+    log.appendAsLeader(records, leaderEpoch = 0)
+    log.updateHighWatermark(6L)
+    assertHighWatermark(6L)
+
+    // High watermark should be adjusted by truncation
+    log.truncateTo(3L)
+    assertHighWatermark(3L)
+  }
+
+  @Test
   def testLogDeleteDirName(): Unit = {
     val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3))
     assertTrue(name1.length <= 255)
@@ -317,7 +358,7 @@ class LogTest {
 
     // Increment the log start offset
     val startOffset = 4
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(startOffset)
     assertTrue(log.logEndOffset > log.logStartOffset)
 
@@ -831,7 +872,7 @@ class LogTest {
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
     assertEquals(2, log.activeProducersWithLastSequence.size)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(1L)
 
     assertEquals(1, log.activeProducersWithLastSequence.size)
@@ -864,7 +905,7 @@ class LogTest {
     assertEquals(2, log.logSegments.size)
     assertEquals(2, log.activeProducersWithLastSequence.size)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(1L)
     log.deleteOldSegments()
 
@@ -923,7 +964,7 @@ class LogTest {
     assertEquals(3, log.logSegments.size)
     assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(2, log.logSegments.size)
@@ -1012,7 +1053,7 @@ class LogTest {
     log.appendAsLeader(records, leaderEpoch = 0)
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.highWatermark = commitAppendInfo.lastOffset + 1
+    log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -1020,7 +1061,7 @@ class LogTest {
     log.close()
 
     val reopenedLog = createLog(logDir, logConfig)
-    reopenedLog.highWatermark = commitAppendInfo.lastOffset + 1
+    reopenedLog.updateHighWatermark(commitAppendInfo.lastOffset + 1)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
 
@@ -1559,7 +1600,7 @@ class LogTest {
       assertEquals(currOffset, messagesToAppend)
 
       // time goes by; the log file is deleted
-      log.highWatermark = currOffset
+      log.updateHighWatermark(currOffset)
       log.deleteOldSegments()
 
       assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
@@ -2035,7 +2076,7 @@ class LogTest {
     val segments = log.logSegments.toArray
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.lazyOffsetIndex.file)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
@@ -2065,7 +2106,7 @@ class LogTest {
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // expire all segments
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     log.close()
     log = createLog(logDir, logConfig)
@@ -2820,7 +2861,7 @@ class LogTest {
 
     // only segments with offset before the current high watermark are eligible for deletion
     for (hw <- 25 to 30) {
-      log.highWatermark = hw
+      log.updateHighWatermark(hw)
       log.deleteOldSegments()
       assertTrue(log.logStartOffset <= hw)
       log.logSegments.foreach { segment =>
@@ -2833,7 +2874,7 @@ class LogTest {
     }
 
     // expire all segments
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
     assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
@@ -2877,7 +2918,7 @@ class LogTest {
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("should have 3 segments", 3, log.numberOfSegments)
     assertEquals(log.logStartOffset, 0)
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
 
     log.maybeIncrementLogStartOffset(1)
     log.deleteOldSegments()
@@ -2909,7 +2950,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("should have 2 segments", 2,log.numberOfSegments)
   }
@@ -2924,7 +2965,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("should have 3 segments", 3,log.numberOfSegments)
   }
@@ -2939,7 +2980,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
@@ -2954,7 +2995,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
   }
@@ -2973,7 +3014,7 @@ class LogTest {
     log.logSegments.head.lastModified = mockTime.milliseconds - 20000
 
     val segments = log.numberOfSegments
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", segments, log.numberOfSegments)
   }
@@ -2988,7 +3029,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
@@ -3006,13 +3047,13 @@ class LogTest {
 
     // Three segments should be created
     assertEquals(3, log.logSegments.count(_ => true))
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(recordsPerSegment)
 
     // The first segment, which is entirely before the log start offset, should be deleted
     // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset
     // greater than the start offset
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 2 segments remaining", 2, log.numberOfSegments)
     assertTrue(log.logSegments.head.baseOffset <= log.logStartOffset)
@@ -3084,7 +3125,7 @@ class LogTest {
     cache.assign(2, 10)
 
     //When first segment is removed
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
 
     //The oldest epoch entry should have been removed
@@ -3109,7 +3150,7 @@ class LogTest {
     cache.assign(2, 10)
 
     //When first segment removed (up to offset 5)
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
 
     //The first entry should have gone from (0,0) => (0,5)
@@ -3219,6 +3260,7 @@ class LogTest {
     builder.build()
   }
 
+  @Test
   def testFirstUnstableOffsetNoTransactionalData() {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
@@ -3248,7 +3290,7 @@ class LogTest {
       new SimpleRecord("baz".getBytes))
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // add more transactional records
     seq += 3
@@ -3256,15 +3298,15 @@ class LogTest {
       new SimpleRecord("blah".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // now transaction is committed
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
 
     // first unstable offset is not updated until the high watermark is advanced
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
-    log.highWatermark = commitAppendInfo.lastOffset + 1
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
+    log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -3315,13 +3357,13 @@ class LogTest {
     assertEquals(expectedTransactions, abortedTransactions)
 
     // Verify caching of the segment position of the first unstable offset
-    log.highWatermark = 30L
+    log.updateHighWatermark(30L)
     assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
 
-    log.highWatermark = 75L
+    log.updateHighWatermark(75L)
     assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     assertEquals(None, log.firstUnstableOffset)
   }
 
@@ -3531,19 +3573,19 @@ class LogTest {
     assertEquals(expectedTransactions, abortedTransactions)
 
     // Verify caching of the segment position of the first unstable offset
-    log.highWatermark = 30L
+    log.updateHighWatermark(30L)
     assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
 
-    log.highWatermark = 75L
+    log.updateHighWatermark(75L)
     assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     assertEquals(None, log.firstUnstableOffset)
   }
 
   private def assertCachedFirstUnstableOffset(log: Log, expectedOffset: Long): Unit = {
-    assertTrue(log.firstUnstableOffset.isDefined)
-    val firstUnstableOffset = log.firstUnstableOffset.get
+    assertTrue(log.producerStateManager.firstUnstableOffset.isDefined)
+    val firstUnstableOffset = log.producerStateManager.firstUnstableOffset.get
     assertEquals(expectedOffset, firstUnstableOffset.messageOffset)
     assertFalse(firstUnstableOffset.messageOffsetOnly)
     assertValidLogOffsetMetadata(log, firstUnstableOffset)
@@ -3600,7 +3642,7 @@ class LogTest {
   }
 
   @Test
-  def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
+  def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
@@ -3615,17 +3657,17 @@ class LogTest {
     assertEquals(2, log.logSegments.size)
     appendPid(5)
 
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(0L), log.firstUnstableOffset)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(5L)
 
     // the first unstable offset should be lower bounded by the log start offset
-    assertEquals(Some(5L), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(5L), log.firstUnstableOffset)
   }
 
   @Test
-  def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = {
+  def testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
     val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
@@ -3640,16 +3682,16 @@ class LogTest {
     assertEquals(2, log.logSegments.size)
     appendPid(5)
 
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(0L), log.firstUnstableOffset)
 
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(8L)
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals(1, log.logSegments.size)
 
     // the first unstable offset should be lower bounded by the log start offset
-    assertEquals(Some(8L), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(Some(8L), log.firstUnstableOffset)
   }
 
   @Test
@@ -3671,26 +3713,26 @@ class LogTest {
       appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
     }
     assertEquals(11L, log.logEndOffset)
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(0L, log.lastStableOffset)
 
     // Try the append a second time. The appended offset in the log should still increase.
     assertThrows[KafkaStorageException] {
       appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
     }
     assertEquals(12L, log.logEndOffset)
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(0L, log.lastStableOffset)
 
     // Even if the high watermark is updated, the first unstable offset does not move
-    log.highWatermark = 12L
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
+    log.updateHighWatermark(12L)
+    assertEquals(0L, log.lastStableOffset)
 
     log.close()
 
     val reopenedLog = createLog(logDir, logConfig)
     assertEquals(12L, reopenedLog.logEndOffset)
     assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
-    reopenedLog.highWatermark = 12L
-    assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
+    reopenedLog.updateHighWatermark(12L)
+    assertEquals(None, reopenedLog.firstUnstableOffset)
   }
 
   @Test
@@ -3705,23 +3747,14 @@ class LogTest {
       new SimpleRecord("c".getBytes)), 5)
 
 
-    log.highWatermark = 2L
-    var offsets: LogOffsetSnapshot = log.offsetSnapshot
+    log.updateHighWatermark(2L)
+    var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot
     assertEquals(offsets.highWatermark.messageOffset, 2L)
     assertFalse(offsets.highWatermark.messageOffsetOnly)
 
-    offsets = log.offsetSnapshot
+    offsets = log.fetchOffsetSnapshot
     assertEquals(offsets.highWatermark.messageOffset, 2L)
     assertFalse(offsets.highWatermark.messageOffsetOnly)
-
-    try {
-      log.highWatermark = 100L
-      offsets = log.offsetSnapshot
-      fail("Should have thrown")
-    } catch {
-      case e: OffsetOutOfRangeException => // pass
-      case _ => fail("Should have seen OffsetOutOfRangeException")
-    }
   }
 
   @Test
@@ -3742,7 +3775,7 @@ class LogTest {
       new SimpleRecord("a".getBytes),
       new SimpleRecord("b".getBytes),
       new SimpleRecord("c".getBytes)), leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // mix in some non-transactional data
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
@@ -3757,20 +3790,20 @@ class LogTest {
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
 
     // LSO should not have changed
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // now first producer's transaction is aborted
     val abortAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.highWatermark = abortAppendInfo.lastOffset + 1
+    log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
 
     // LSO should now point to one less than the first offset of the second transaction
-    assertEquals(Some(secondAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
+    assertEquals(secondAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // commit the second transaction
     val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.highWatermark = commitAppendInfo.lastOffset + 1
+    log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
 
     // now there should be no first unstable offset
     assertEquals(None, log.firstUnstableOffset)
@@ -3791,8 +3824,7 @@ class LogTest {
     val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset))
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
 
     // this write should spill to the second segment
     seq = 3
@@ -3800,15 +3832,14 @@ class LogTest {
       new SimpleRecord("d".getBytes),
       new SimpleRecord("e".getBytes),
       new SimpleRecord("f".getBytes)), leaderEpoch = 0)
-    assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset))
-    assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset))
+    assertEquals(firstAppendInfo.firstOffset, log.firstUnstableOffset)
     assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
 
     // now abort the transaction
     val appendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch),
       isFromClient = false, leaderEpoch = 0)
-    log.highWatermark = appendInfo.lastOffset + 1
-    assertEquals(None, log.firstUnstableOffset.map(_.messageOffset))
+    log.updateHighWatermark(appendInfo.lastOffset + 1)
+    assertEquals(None, log.firstUnstableOffset)
 
     // now check that a fetch includes the aborted transaction
     val fetchDataInfo = log.read(0L, 2048, maxOffset = None, minOneMessage = true, includeAbortedTxns = true)
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 3eeb064..d99701a 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -28,7 +28,9 @@ import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 
+import kafka.cluster.Partition
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.SimpleRecord
 
 class HighwatermarkPersistenceTest {
 
@@ -85,7 +87,7 @@ class HighwatermarkPersistenceTest {
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(log0.highWatermark, fooPartition0Hw)
       // set the high watermark for local replica
-      partition0.localLogOrException.highWatermark = 5L
+      partition0.localLogOrException.updateHighWatermark(5L)
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(log0.highWatermark, fooPartition0Hw)
@@ -128,7 +130,8 @@ class HighwatermarkPersistenceTest {
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(topic1Log0.highWatermark, topic1Partition0Hw)
       // set the high watermark for local replica
-      topic1Partition0.localLogOrException.highWatermark = 5L
+      append(topic1Partition0, count = 5)
+      topic1Partition0.localLogOrException.updateHighWatermark(5L)
       replicaManager.checkpointHighWatermarks()
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(5L, topic1Log0.highWatermark)
@@ -144,10 +147,12 @@ class HighwatermarkPersistenceTest {
       var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
       assertEquals(topic2Log0.highWatermark, topic2Partition0Hw)
       // set the highwatermark for local replica
-      topic2Partition0.localLogOrException.highWatermark = 15L
+      append(topic2Partition0, count = 15)
+      topic2Partition0.localLogOrException.updateHighWatermark(15L)
       assertEquals(15L, topic2Log0.highWatermark)
       // change the highwatermark for topic1
-      topic1Partition0.localLogOrException.highWatermark = 10L
+      append(topic1Partition0, count = 5)
+      topic1Partition0.localLogOrException.updateHighWatermark(10L)
       assertEquals(10L, topic1Log0.highWatermark)
       replicaManager.checkpointHighWatermarks()
       // verify checkpointed hw for topic 2
@@ -165,7 +170,12 @@ class HighwatermarkPersistenceTest {
     }
   }
 
-  def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
+  private def append(partition: Partition, count: Int): Unit = {
+    val records = TestUtils.records((0 to count).map(i => new SimpleRecord(s"$i".getBytes)))
+    partition.localLogOrException.appendAsLeader(records, leaderEpoch = 0)
+  }
+
+  private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
     replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(
       new TopicPartition(topic, partition), 0L)
   }
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 46fcee5..27f5499 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -78,8 +78,7 @@ class LogOffsetTest extends BaseRequestTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()
 
-    log.highWatermarkMetadata = LogOffsetMetadata(log.logEndOffset)
-    log.highWatermark = log.logEndOffset
+    log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(3)
     log.deleteOldSegments()
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index fddceff..7637bfc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -248,7 +248,7 @@ class ReplicaManagerQuotasTest {
     //create the two replicas
     for ((p, _) <- fetchInfo) {
       val partition = replicaManager.createPartition(p)
-      log.highWatermark = 5
+      log.updateHighWatermark(5)
       partition.leaderReplicaIdOpt = Some(leaderBrokerId)
       partition.setLog(log, isFutureLog = false)
 
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index bf5c20e..1aec9d8 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -83,7 +83,8 @@ class SimpleFetchTest {
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.highWatermarkMetadata).andReturn(LogOffsetMetadata(partitionHW)).anyTimes()
+    EasyMock.expect(log.maybeIncrementHighWatermark(EasyMock.anyObject[LogOffsetMetadata]))
+      .andReturn(Some(LogOffsetMetadata(partitionHW))).anyTimes()
     EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
     EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
     EasyMock.expect(log.read(
@@ -123,7 +124,7 @@ class SimpleFetchTest {
     val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId))
 
     // create the leader replica with the local log
-    log.highWatermark = partitionHW
+    log.updateHighWatermark(partitionHW)
     partition.leaderReplicaIdOpt = Some(configs.head.brokerId)
     partition.setLog(log, false)
 


Mime
View raw message