kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Move common out of range handling into AbstractFetcherThread (#5608)
Date Sun, 09 Sep 2018 00:00:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0984a76  MINOR: Move common out of range handling into AbstractFetcherThread (#5608)
0984a76 is described below

commit 0984a76b712caa18c688eafbacaa2a7c889d27b2
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Sat Sep 8 17:00:13 2018 -0700

    MINOR: Move common out of range handling into AbstractFetcherThread (#5608)
    
    This patch removes the duplication of the out of range handling between `ReplicaFetcherThread` and `ReplicaAlterLogDirsThread` and attempts to expose a cleaner API for extension. It also adds a mock implementation to facilitate testing and several new test cases.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../apache/kafka/common/requests/FetchRequest.java |   2 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |  20 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |   5 +-
 .../scala/kafka/server/AbstractFetcherThread.scala | 231 +++++++----
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  79 ++--
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 140 +++----
 .../ReplicaFetcherThreadFatalErrorTest.scala       |   2 +-
 .../kafka/server/AbstractFetcherThreadTest.scala   | 454 ++++++++++++++++-----
 8 files changed, 615 insertions(+), 318 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 6e25f7c..e013f5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -267,7 +267,7 @@ public class FetchRequest extends AbstractRequest {
         private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
         private FetchMetadata metadata = FetchMetadata.LEGACY;
-        private List<TopicPartition> toForget = Collections.<TopicPartition>emptyList();
+        private List<TopicPartition> toForget = Collections.emptyList();
 
         public static Builder forConsumer(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
             return new Builder(ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(),
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d76d6d0..2036bb0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,7 +16,6 @@
  */
 package kafka.cluster
 
-
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import com.yammer.metrics.core.Gauge
@@ -591,26 +590,25 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = {
+  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
+    // The read lock is needed to handle race condition if request handler thread tries to
+    // remove future replica after receiving AlterReplicaLogDirsRequest.
     inReadLock(leaderIsrUpdateLock) {
       if (isFuture) {
-        // The read lock is needed to handle race condition if request handler thread tries to
-        // remove future replica after receiving AlterReplicaLogDirsRequest.
-        inReadLock(leaderIsrUpdateLock) {
-          getReplica(Request.FutureLocalReplicaId) match {
-            case Some(replica) => replica.log.get.appendAsFollower(records)
-            case None => // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called
-          }
+        // Note the replica may be undefined if it is removed by a non-ReplicaAlterLogDirsThread before
+        // this method is called
+        getReplica(Request.FutureLocalReplicaId).map { replica =>
+          replica.log.get.appendAsFollower(records)
         }
       } else {
         // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread
         // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica.
-        getReplicaOrException().log.get.appendAsFollower(records)
+        Some(getReplicaOrException().log.get.appendAsFollower(records))
       }
     }
   }
 
-  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) {
+  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
     try {
       doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
     } catch {
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 962aaff..839579b 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,6 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.server.epoch.LeaderEpochCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -52,9 +53,9 @@ class Replica(val brokerId: Int,
 
   def isLocal: Boolean = log.isDefined
 
-  def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
+  def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
-  val epochs = log.map(_.leaderEpochCache)
+  val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
 
   info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index e753f6e..44137cf 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
 import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
 import org.apache.kafka.common.requests.EpochEndOffset._
@@ -36,10 +36,11 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
 import com.yammer.metrics.core.Gauge
+import kafka.log.LogAppendInfo
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest}
 
 import scala.math._
 
@@ -50,8 +51,7 @@ abstract class AbstractFetcherThread(name: String,
                                      clientId: String,
                                      val sourceBroker: BrokerEndPoint,
                                      fetchBackOffMs: Int = 0,
-                                     isInterruptible: Boolean = true,
-                                     includeLogTruncation: Boolean)
+                                     isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
 
   type PD = FetchResponse.PartitionData[Records]
@@ -67,21 +67,31 @@ abstract class AbstractFetcherThread(name: String,
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD,
-                                     records: MemoryRecords)
+  protected def processPartitionData(topicPartition: TopicPartition,
+                                     fetchOffset: Long,
+                                     partitionData: PD): Option[LogAppendInfo]
 
-  // handle a partition whose offset is out of range and return a new fetch offset
-  protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
+  protected def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit
 
-  protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset]
-
-  protected def truncate(topicPartition: TopicPartition, epochEndOffset: EpochEndOffset): OffsetTruncationState
+  protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit
 
   protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
 
-  protected def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)]
+  protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean
+
+  protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
+
+  protected def logEndOffset(topicPartition: TopicPartition): Long
+
+  protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch]
+
+  protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset]
+
+  protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)]
+
+  protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long
 
-  protected def getReplica(tp: TopicPartition): Option[Replica]
+  protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition): Long
 
   override def shutdown() {
     initiateShutdown()
@@ -97,7 +107,10 @@ abstract class AbstractFetcherThread(name: String,
 
   override def doWork() {
     maybeTruncate()
+    maybeFetch()
+  }
 
+  private def maybeFetch(): Unit = {
     val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
       val fetchStates = partitionStates.partitionStateMap.asScala
       val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
@@ -134,7 +147,7 @@ abstract class AbstractFetcherThread(name: String,
     partitionStates.partitionStates.asScala.foreach { state =>
       val tp = state.topicPartition
       if (state.value.isTruncatingLog) {
-        getReplica(tp).flatMap(_.epochs).map(_.latestEpoch) match {
+        latestEpoch(tp) match {
           case Some(latestEpoch) => partitionsWithEpochs += tp -> latestEpoch
           case None => partitionsWithoutEpochs += tp
         }
@@ -194,6 +207,12 @@ abstract class AbstractFetcherThread(name: String,
     ResultWithPartitions(fetchOffsets, partitionsWithError)
   }
 
+  private def truncate(topicPartition: TopicPartition, epochEndOffset: EpochEndOffset): OffsetTruncationState = {
+    val offsetTruncationState = getOffsetTruncationState(topicPartition, epochEndOffset)
+    truncate(topicPartition, offsetTruncationState)
+    offsetTruncationState
+  }
+
   private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],
                                   fetchRequest: FetchRequest.Builder): Unit = {
     val partitionsWithError = mutable.Set[TopicPartition]()
@@ -201,7 +220,7 @@ abstract class AbstractFetcherThread(name: String,
 
     try {
       trace(s"Sending fetch request $fetchRequest")
-      responseData = fetch(fetchRequest)
+      responseData = fetchFromLeader(fetchRequest)
     } catch {
       case t: Throwable =>
         if (isRunning) {
@@ -220,7 +239,6 @@ abstract class AbstractFetcherThread(name: String,
     if (responseData.nonEmpty) {
       // process fetched data
       inLock(partitionMapLock) {
-
         responseData.foreach { case (topicPartition, partitionData) =>
           Option(partitionStates.stateValue(topicPartition)).foreach { currentPartitionFetchState =>
             // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
@@ -231,28 +249,31 @@ abstract class AbstractFetcherThread(name: String,
               partitionData.error match {
                 case Errors.NONE =>
                   try {
-                    val records = toMemoryRecords(partitionData.records)
-                    val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
-                      currentPartitionFetchState.fetchOffset)
-
-                    fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData, records)
-
-                    val validBytes = records.validBytes
-                    // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                    if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                      // Update partitionStates only if there is no exception during processPartitionData
-                      partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
-                      fetcherStats.byteRate.mark(validBytes)
+                    val logAppendInfoOpt = processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset,
+                      partitionData)
+
+                    logAppendInfoOpt.foreach { logAppendInfo =>
+                      val nextOffset = logAppendInfo.lastOffset + 1
+                      fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset)
+
+                      val validBytes = logAppendInfo.validBytes
+                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
+                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
+                        // Update partitionStates only if there is no exception during processPartitionData
+                        partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(nextOffset))
+                        fetcherStats.byteRate.mark(validBytes)
+                      }
                     }
                   } catch {
                     case ime: CorruptRecordException =>
                       // we log the error and continue. This ensures two things
-                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
-                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
-                      // should get fixed in the subsequent fetches
-                      error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset}", ime)
+                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
+                      //    down and cause other topic partition to also lag
+                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
+                      //    can cause this), we simply continue and should get fixed in the subsequent fetches
+                      error(s"Found invalid messages during fetch for partition $topicPartition " +
+                        s"offset ${currentPartitionFetchState.fetchOffset}", ime)
                       partitionsWithError += topicPartition
                     case e: KafkaStorageException =>
                       error(s"Error while processing data for partition $topicPartition", e)
@@ -297,8 +318,6 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) {
-    if (!includeLogTruncation)
-      throw new IllegalStateException("Truncation should not be requested if includeLogTruncation is disabled")
     partitionMapLock.lockInterruptibly()
     try {
       Option(partitionStates.stateValue(topicPartition)).foreach { state =>
@@ -318,9 +337,9 @@ abstract class AbstractFetcherThread(name: String,
       }.map { case (tp, initialFetchOffset) =>
         val fetchState =
           if (initialFetchOffset < 0)
-            new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
+            new PartitionFetchState(handleOffsetOutOfRange(tp), truncatingLog = true)
           else
-            new PartitionFetchState(initialFetchOffset, includeLogTruncation)
+            new PartitionFetchState(initialFetchOffset, truncatingLog = true)
         tp -> fetchState
       }
 
@@ -341,11 +360,11 @@ abstract class AbstractFetcherThread(name: String,
   private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]) {
     val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
       .map { state =>
-        val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) match {
+        val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match {
           case Some(offsetTruncationState) => PartitionFetchState(offsetTruncationState.offset, state.value.delay, truncatingLog = !offsetTruncationState.truncationCompleted)
           case None => state.value()
         }
-        (state.topicPartition(), maybeTruncationComplete)
+        (state.topicPartition, maybeTruncationComplete)
       }.toMap
     partitionStates.set(newStates.asJava)
   }
@@ -372,57 +391,121 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param tp                    Topic partition
    * @param leaderEpochOffset     Epoch end offset received from the leader for this topic partition
-   * @param replica               Follower's replica, which is either local replica
-   *                              (ReplicaFetcherThread) or future replica (ReplicaAlterLogDirsThread)
-   * @param isFutureReplica       true if called from ReplicaAlterLogDirsThread
    */
-  def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset: EpochEndOffset, replica: Replica, isFutureReplica: Boolean = false): OffsetTruncationState = {
-    // to make sure we can distinguish log output for fetching from remote leader or local replica
-    val followerName = if (isFutureReplica) "future replica" else "follower"
-
+  private def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset: EpochEndOffset): OffsetTruncationState = {
     if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
       // truncate to initial offset which is the high watermark for follower replica. For
       // future replica, it is either high watermark of the future replica or current
       // replica's truncation offset (when the current replica truncates, it forces future
       // replica's partition state to 'truncating' and sets initial offset to its truncation offset)
-      warn(s"Based on $followerName's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
+      warn(s"Based on replica's leader epoch, leader replied with an unknown offset in $tp. " +
            s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
       OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true)
     } else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH) {
       // either leader or follower or both use inter-broker protocol version < KAFKA_2_0_IV0
       // (version 0 of OffsetForLeaderEpoch request/response)
-      warn(s"Leader or $followerName is on protocol version where leader epoch is not considered in the OffsetsForLeaderEpoch response. " +
-           s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation in ${replica.topicPartition}.")
-      OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true)
+      warn(s"Leader or replica is on protocol version where leader epoch is not considered in the OffsetsForLeaderEpoch response. " +
+           s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation in $tp.")
+      OffsetTruncationState(min(leaderEpochOffset.endOffset, logEndOffset(tp)), truncationCompleted = true)
     } else {
+      val replicaEndOffset = logEndOffset(tp)
+
       // get (leader epoch, end offset) pair that corresponds to the largest leader epoch
       // less than or equal to the requested epoch.
-      val (followerEpoch, followerEndOffset) = replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch)
-      if (followerEndOffset == UNDEFINED_EPOCH_OFFSET) {
-        // This can happen if the follower was not tracking leader epochs at that point (before the
-        // upgrade, or if this broker is new). Since the leader replied with epoch <
-        // requested epoch from follower, so should be safe to truncate to leader's
-        // offset (this is the same behavior as post-KIP-101 and pre-KIP-279)
-        warn(s"Based on $followerName's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
-             s"below any $followerName's tracked epochs for ${replica.topicPartition}. " +
-             s"The leader's offset only ${leaderEpochOffset.endOffset} will be used for truncation.")
-        OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true)
-      } else if (followerEpoch != leaderEpochOffset.leaderEpoch) {
-        // the follower does not know about the epoch that leader replied with
-        // we truncate to the end offset of the largest epoch that is smaller than the
-        // epoch the leader replied with, and send another offset for leader epoch request
-        val intermediateOffsetToTruncateTo = min(followerEndOffset, replica.logEndOffset.messageOffset)
-        info(s"Based on $followerName's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
-             s"unknown to the $followerName for ${replica.topicPartition}. " +
-             s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.")
-        OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false)
-      } else {
-        val offsetToTruncateTo = min(followerEndOffset, leaderEpochOffset.endOffset)
-        OffsetTruncationState(min(offsetToTruncateTo, replica.logEndOffset.messageOffset), truncationCompleted = true)
+      endOffsetForEpoch(tp, leaderEpochOffset.leaderEpoch) match {
+        case Some(OffsetAndEpoch(followerEndOffset, followerEpoch)) =>
+          if (followerEpoch != leaderEpochOffset.leaderEpoch) {
+            // the follower does not know about the epoch that leader replied with
+            // we truncate to the end offset of the largest epoch that is smaller than the
+            // epoch the leader replied with, and send another offset for leader epoch request
+            val intermediateOffsetToTruncateTo = min(followerEndOffset, replicaEndOffset)
+            info(s"Based on replica's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
+              s"unknown to the replica for $tp. " +
+              s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.")
+            OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false)
+          } else {
+            val offsetToTruncateTo = min(followerEndOffset, leaderEpochOffset.endOffset)
+            OffsetTruncationState(min(offsetToTruncateTo, replicaEndOffset), truncationCompleted = true)
+          }
+        case None =>
+          // This can happen if the follower was not tracking leader epochs at that point (before the
+          // upgrade, or if this broker is new). Since the leader replied with epoch <
+          // requested epoch from follower, so should be safe to truncate to leader's
+          // offset (this is the same behavior as post-KIP-101 and pre-KIP-279)
+          warn(s"Based on replica's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
+            s"below any replica's tracked epochs for $tp. " +
+            s"The leader's offset only ${leaderEpochOffset.endOffset} will be used for truncation.")
+          OffsetTruncationState(min(leaderEpochOffset.endOffset, replicaEndOffset), truncationCompleted = true)
       }
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch offset.
+   */
+  protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
+    val replicaEndOffset = logEndOffset(topicPartition)
+
+    /**
+     * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
+     * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
+     * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
+     * and it may discover that the current leader's end offset is behind its own end offset.
+     *
+     * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
+     *
+     * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
+     */
+    val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
+    if (leaderEndOffset < replicaEndOffset) {
+      // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
+      // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
+      // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
+      if (!isUncleanLeaderElectionAllowed(topicPartition)) {
+        // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
+        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
+          s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}")
+        throw new FatalExitError
+      }
+
+      warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
+        s"leader's latest offset $leaderEndOffset")
+      truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset))
+      leaderEndOffset
+    } else {
+      /**
+       * If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
+       * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+       * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+       * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
+       * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
+       * to fetch from the new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
+       * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
+       * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
+       *
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.
+       * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
+       * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
+       * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
+       * brokers and producers.
+       *
+       * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
+       * and the current leader's log start offset.
+       */
+      val leaderStartOffset = fetchEarliestOffsetFromLeader(topicPartition)
+      warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
+        s"leader's start offset $leaderStartOffset")
+      val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
+      // Only truncate log when current leader's log start offset is greater than follower's log end offset.
+      if (leaderStartOffset > replicaEndOffset)
+        truncateFullyAndStartAt(topicPartition, leaderStartOffset)
+      offsetToFetch
+    }
+  }
+
+
   def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
     partitionMapLock.lockInterruptibly()
     try {
@@ -459,7 +542,7 @@ abstract class AbstractFetcherThread(name: String,
     }.toMap
   }
 
-  private def toMemoryRecords(records: Records): MemoryRecords = {
+  protected def toMemoryRecords(records: Records): MemoryRecords = {
     records match {
       case r: MemoryRecords => r
       case r: FileRecords =>
@@ -587,3 +670,5 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
 
   override def toString = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted)
 }
+
+case class OffsetAndEpoch(offset: Long, epoch: Int)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 1621201..5aec7a9 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -20,13 +20,14 @@ package kafka.server
 import java.util
 
 import kafka.api.Request
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
+import kafka.log.LogAppendInfo
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UnboundedQuota
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{MemoryRecords, Records}
+import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchResponse.PartitionData
 import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
@@ -44,18 +45,32 @@ class ReplicaAlterLogDirsThread(name: String,
                                 clientId = name,
                                 sourceBroker = sourceBroker,
                                 fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
-                                isInterruptible = false,
-                                includeLogTruncation = true) {
+                                isInterruptible = false) {
 
   private val replicaId = brokerConfig.brokerId
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
-  protected def getReplica(tp: TopicPartition): Option[Replica] = {
-    replicaMgr.getReplica(tp, Request.FutureLocalReplicaId)
+  override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
+    replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).epochs.map(_.latestEpoch)
   }
 
-  def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
+  override protected def logEndOffset(topicPartition: TopicPartition): Long = {
+    replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logEndOffset.messageOffset
+  }
+
+  override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
+    val replica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+    replica.epochs.flatMap { epochCache =>
+      val (foundEpoch, foundOffset) = epochCache.endOffsetFor(epoch)
+      if (foundOffset == UNDEFINED_EPOCH_OFFSET)
+        None
+      else
+        Some(OffsetAndEpoch(foundOffset, foundEpoch))
+    }
+  }
+
+  def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
     var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
     val request = fetchRequest.build()
 
@@ -86,16 +101,18 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   // process fetched data
-  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData[Records],
-                           records: MemoryRecords) {
+  override def processPartitionData(topicPartition: TopicPartition,
+                                    fetchOffset: Long,
+                                    partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
     val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
     val partition = replicaMgr.getPartition(topicPartition).get
+    val records = toMemoryRecords(partitionData.records)
 
     if (fetchOffset != futureReplica.logEndOffset.messageOffset)
       throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
 
-    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
+    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
     val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
     futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
     futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
@@ -104,29 +121,17 @@ class ReplicaAlterLogDirsThread(name: String,
       removePartitions(Set(topicPartition))
 
     quota.record(records.sizeInBytes)
+    logAppendInfo
   }
 
-  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
-    val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
-    val currentReplica = replicaMgr.getReplicaOrException(topicPartition)
-    val partition = replicaMgr.getPartition(topicPartition).get
-    val logEndOffset: Long = currentReplica.logEndOffset.messageOffset
+  override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true
 
-    if (logEndOffset < futureReplica.logEndOffset.messageOffset) {
-      warn("Future replica for partition %s reset its fetch offset from %d to current replica's latest offset %d"
-        .format(topicPartition, futureReplica.logEndOffset.messageOffset, logEndOffset))
-      partition.truncateTo(logEndOffset, isFuture = true)
-      logEndOffset
-    } else {
-      val currentReplicaStartOffset: Long = currentReplica.logStartOffset
-      warn("Future replica for partition %s reset its fetch offset from %d to current replica's start offset %d"
-        .format(topicPartition, futureReplica.logEndOffset.messageOffset, currentReplicaStartOffset))
-      val offsetToFetch = Math.max(currentReplicaStartOffset, futureReplica.logEndOffset.messageOffset)
-      // Only truncate the log when current replica's log start offset is greater than future replica's log end offset.
-      if (currentReplicaStartOffset > futureReplica.logEndOffset.messageOffset)
-        partition.truncateFullyAndStartAt(currentReplicaStartOffset, isFuture = true)
-      offsetToFetch
-    }
+  override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = {
+    replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+  }
+
+  override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition): Long = {
+    replicaMgr.getReplicaOrException(topicPartition).logEndOffset.messageOffset
   }
 
   /**
@@ -134,7 +139,7 @@ class ReplicaAlterLogDirsThread(name: String,
    * @param partitions map of topic partition -> leader epoch of the future replica
    * @return map of topic partition -> end offset for a requested leader epoch
    */
-  def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+  override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
     partitions.map { case (tp, epoch) =>
       try {
         val (leaderEpoch, leaderOffset) = replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)
@@ -161,14 +166,14 @@ class ReplicaAlterLogDirsThread(name: String,
    * the future replica may miss "mark for truncation" event and must use the offset for leader epoch
    * exchange with the current replica to truncate to the largest common log prefix for the topic partition
    */
-  override def truncate(topicPartition: TopicPartition, epochEndOffset: EpochEndOffset): OffsetTruncationState = {
-    val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+  override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
     val partition = replicaMgr.getPartition(topicPartition).get
+    partition.truncateTo(truncationState.offset, isFuture = true)
+  }
 
-    val offsetTruncationState = getOffsetTruncationState(topicPartition, epochEndOffset, futureReplica,
-      isFutureReplica = true)
-    partition.truncateTo(offsetTruncationState.offset, isFuture = true)
-    offsetTruncationState
+  override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
+    val partition = replicaMgr.getPartition(topicPartition).get
+    partition.truncateFullyAndStartAt(offset, isFuture = true)
   }
 
   def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5624e84..1848eb7 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,14 +18,13 @@
 package kafka.server
 
 import kafka.api._
-import kafka.cluster.{BrokerEndPoint, Replica}
-import kafka.log.LogConfig
+import kafka.cluster.BrokerEndPoint
+import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.zk.AdminZkClient
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, Records}
@@ -49,8 +48,7 @@ class ReplicaFetcherThread(name: String,
                                 clientId = name,
                                 sourceBroker = sourceBroker,
                                 fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
-                                isInterruptible = false,
-                                includeLogTruncation = true) {
+                                isInterruptible = false) {
 
   private val replicaId = brokerConfig.brokerId
   private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " +
@@ -88,12 +86,26 @@ class ReplicaFetcherThread(name: String,
   private val minBytes = brokerConfig.replicaFetchMinBytes
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
-  private val brokerSupportsLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
-
+  private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
   private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
 
-  protected def getReplica(tp: TopicPartition): Option[Replica] = {
-    replicaMgr.getReplica(tp)
+  override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
+    replicaMgr.getReplicaOrException(topicPartition).epochs.map(_.latestEpoch)
+  }
+
+  override protected def logEndOffset(topicPartition: TopicPartition): Long = {
+    replicaMgr.getReplicaOrException(topicPartition).logEndOffset.messageOffset
+  }
+
+  override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
+    val replica = replicaMgr.getReplicaOrException(topicPartition)
+    replica.epochs.flatMap { epochCache =>
+      val (foundEpoch, foundOffset) = epochCache.endOffsetFor(epoch)
+      if (foundOffset == UNDEFINED_EPOCH_OFFSET)
+        None
+      else
+        Some(OffsetAndEpoch(foundOffset, foundEpoch))
+    }
   }
 
   override def initiateShutdown(): Boolean = {
@@ -105,9 +117,12 @@ class ReplicaFetcherThread(name: String,
   }
 
   // process fetched data
-  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD, records: MemoryRecords) {
+  override def processPartitionData(topicPartition: TopicPartition,
+                                    fetchOffset: Long,
+                                    partitionData: PD): Option[LogAppendInfo] = {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
     val partition = replicaMgr.getPartition(topicPartition).get
+    val records = toMemoryRecords(partitionData.records)
 
     maybeWarnIfOversizedRecords(records, topicPartition)
 
@@ -120,7 +135,7 @@ class ReplicaFetcherThread(name: String,
         .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
-    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
+    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
 
     if (isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
@@ -140,6 +155,8 @@ class ReplicaFetcherThread(name: String,
     if (quota.isThrottled(topicPartition))
       quota.record(records.sizeInBytes)
     replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
+
+    logAppendInfo
   }
 
   def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
@@ -151,79 +168,13 @@ class ReplicaFetcherThread(name: String,
         "equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
   }
 
-  /**
-   * Handle a partition whose offset is out of range and return a new fetch offset.
-   */
-  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
-    val replica = replicaMgr.getReplicaOrException(topicPartition)
-    val partition = replicaMgr.getPartition(topicPartition).get
-
-    /**
-     * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
-     * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
-     * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
-     * and it may discover that the current leader's end offset is behind its own end offset.
-     *
-     * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
-     *
-     * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
-     */
-    val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
-
-    if (leaderEndOffset < replica.logEndOffset.messageOffset) {
-      // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
-      // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
-      // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
-      val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
-      if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
-        ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
-        // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
-        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
-          s"latest offset $leaderEndOffset is less than replica's latest offset ${replica.logEndOffset.messageOffset}")
-        throw new FatalExitError
-      }
-
-      warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
-        s"leader's latest offset $leaderEndOffset")
-      partition.truncateTo(leaderEndOffset, isFuture = false)
-      replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, topicPartition, leaderEndOffset)
-      leaderEndOffset
-    } else {
-      /**
-       * If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
-       * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
-       * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
-       * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
-       * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
-       * to fetch from the new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
-       * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
-       * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
-       *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
-       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
-       * start offset.
-       * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
-       * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
-       * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
-       * brokers and producers.
-       *
-       * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
-       * and the current leader's log start offset.
-       *
-       */
-      val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
-      warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
-        s"leader's start offset $leaderStartOffset")
-      val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
-      // Only truncate log when current leader's log start offset is greater than follower's log end offset.
-      if (leaderStartOffset > replica.logEndOffset.messageOffset) {
-        partition.truncateFullyAndStartAt(leaderStartOffset, isFuture = false)
-      }
-      offsetToFetch
-    }
+  override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = {
+    val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
+    LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
+      ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable
   }
 
-  protected def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
+  override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
@@ -239,7 +190,15 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
-  private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long): Long = {
+  override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = {
+    fetchOffsetFromLeader(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
+  }
+
+  override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition): Long = {
+    fetchOffsetFromLeader(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)
+  }
+
+  private def fetchOffsetFromLeader(topicPartition: TopicPartition, earliestOrLatest: Long): Long = {
     val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
         val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long))
         ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setTargetTimes(partitions.asJava)
@@ -299,19 +258,23 @@ class ReplicaFetcherThread(name: String,
    * Truncate the log for each partition's epoch based on leader's returned epoch and offset.
    * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
    */
-  override def truncate(tp: TopicPartition, epochEndOffset: EpochEndOffset): OffsetTruncationState = {
+  override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
     val replica = replicaMgr.getReplicaOrException(tp)
     val partition = replicaMgr.getPartition(tp).get
+    partition.truncateTo(offsetTruncationState.offset, isFuture = false)
 
-    val offsetTruncationState = getOffsetTruncationState(tp, epochEndOffset, replica)
     if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
-      warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark ${replica.highWatermark.messageOffset}")
-    partition.truncateTo(offsetTruncationState.offset, isFuture = false)
+      warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +
+        s"${replica.highWatermark.messageOffset}")
 
     // mark the future replica for truncation only when we do last truncation
     if (offsetTruncationState.truncationCompleted)
       replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset)
-    offsetTruncationState
+  }
+
+  override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
+    val partition = replicaMgr.getPartition(topicPartition).get
+    partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
   override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
@@ -360,4 +323,5 @@ class ReplicaFetcherThread(name: String,
     val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
   }
+
 }
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 6fcf0cc..392c912 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -88,7 +88,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
       import params._
       new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
         override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
-        override protected def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
+        override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
           fetchRequest.fetchData.asScala.keys.toSeq.map { tp =>
             (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
               FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 15abc68..c456433 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -17,20 +17,26 @@
 
 package kafka.server
 
+import java.nio.ByteBuffer
+
 import AbstractFetcherThread._
 import com.yammer.metrics.Metrics
-import kafka.cluster.{BrokerEndPoint, Replica}
+import kafka.cluster.BrokerEndPoint
+import kafka.log.LogAppendInfo
+import kafka.message.NoCompressionCodec
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest}
-import org.apache.kafka.common.requests.FetchResponse.PartitionData
-import org.junit.Assert.{assertFalse, assertTrue}
+import org.apache.kafka.common.utils.Time
+import org.junit.Assert._
 import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Set, mutable}
+import scala.util.Random
 
 class AbstractFetcherThreadTest {
 
@@ -40,174 +46,412 @@ class AbstractFetcherThreadTest {
       Metrics.defaultRegistry().removeMetric(metricName)
   }
 
+  private def allMetricsNames: Set[String] = Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
+
+  private def mkBatch(baseOffset: Long, leaderEpoch: Int, records: SimpleRecord*): RecordBatch = {
+    MemoryRecords.withRecords(baseOffset, CompressionType.NONE, leaderEpoch, records: _*)
+      .batches.asScala.head
+  }
+
   @Test
-  def testMetricsRemovedOnShutdown() {
+  def testMetricsRemovedOnShutdown(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcherThread = new DummyFetcherThread("dummy", "client", new BrokerEndPoint(0, "localhost", 9092))
-
-    fetcherThread.start()
+    val fetcher = new MockFetcherThread
 
     // add one partition to create the consumer lag metric
-    fetcherThread.addPartitions(Map(partition -> 0L))
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+    fetcher.addPartitions(Map(partition -> 0L))
+    fetcher.setLeaderState(partition, MockFetcherThread.PartitionState())
+
+    fetcher.start()
 
     // wait until all fetcher metrics are present
     TestUtils.waitUntilTrue(() =>
       allMetricsNames == Set(FetcherMetrics.BytesPerSec, FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag),
       "Failed waiting for all fetcher metrics to be registered")
 
-    fetcherThread.shutdown()
+    fetcher.shutdown()
 
     // after shutdown, they should be gone
     assertTrue(Metrics.defaultRegistry().allMetrics().isEmpty)
   }
 
   @Test
-  def testConsumerLagRemovedWithPartition() {
+  def testConsumerLagRemovedWithPartition(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcherThread = new DummyFetcherThread("dummy", "client", new BrokerEndPoint(0, "localhost", 9092))
-
-    fetcherThread.start()
+    val fetcher = new MockFetcherThread
 
     // add one partition to create the consumer lag metric
-    fetcherThread.addPartitions(Map(partition -> 0L))
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+    fetcher.addPartitions(Map(partition -> 0L))
+    fetcher.setLeaderState(partition, MockFetcherThread.PartitionState())
+
+    fetcher.doWork()
 
-    // wait until lag metric is present
-    TestUtils.waitUntilTrue(() => allMetricsNames(FetcherMetrics.ConsumerLag),
-      "Failed waiting for consumer lag metric")
+    assertTrue("Failed waiting for consumer lag metric",
+      allMetricsNames(FetcherMetrics.ConsumerLag))
 
     // remove the partition to simulate leader migration
-    fetcherThread.removePartitions(Set(partition))
+    fetcher.removePartitions(Set(partition))
 
     // the lag metric should now be gone
     assertFalse(allMetricsNames(FetcherMetrics.ConsumerLag))
+  }
+
+  @Test
+  def testSimpleFetch(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread
+
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+    fetcher.addPartitions(Map(partition -> 0L))
+
+    val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+      new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+    val leaderState = MockFetcherThread.PartitionState(Seq(batch), highWatermark = 2L)
+    fetcher.setLeaderState(partition, leaderState)
+
+    fetcher.doWork()
+
+    val replicaState = fetcher.replicaPartitionState(partition)
+    assertEquals(2L, replicaState.logEndOffset)
+    assertEquals(2L, replicaState.highWatermark)
+  }
+
+  @Test
+  def testTruncation(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, highWatermark = 0L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 3L))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 1, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 3, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 5, new SimpleRecord("c".getBytes)))
+
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, highWatermark = 2L)
+    fetcher.setLeaderState(partition, leaderState)
 
-    fetcherThread.shutdown()
+    TestUtils.waitUntilTrue(() => {
+      fetcher.doWork()
+      fetcher.replicaPartitionState(partition).log == fetcher.leaderPartitionState(partition).log
+    }, "Failed to reconcile leader and follower logs")
+
+    assertEquals(leaderState.logStartOffset, replicaState.logStartOffset)
+    assertEquals(leaderState.logEndOffset, replicaState.logEndOffset)
+    assertEquals(leaderState.highWatermark, replicaState.highWatermark)
   }
 
-  private def allMetricsNames = Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
+  @Test(expected = classOf[FatalExitError])
+  def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, highWatermark = 0L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 3L))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
 
-  protected def fetchRequestBuilder(partitionMap: collection.Map[TopicPartition, PartitionFetchState]): FetchRequest.Builder = {
-    val partitionData = partitionMap.map { case (tp, fetchState) =>
-      tp -> new FetchRequest.PartitionData(fetchState.fetchOffset, 0, 1024 * 1024)
-    }.toMap.asJava
-    FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, 0, 0, 1, partitionData)
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, highWatermark = 2L)
+    fetcher.setLeaderState(partition, leaderState)
+
+    // initial truncation and verify that the log end offset is updated
+    fetcher.doWork()
+    assertEquals(3L, replicaState.logEndOffset)
+    assertFalse(fetcher.partitionStates.stateValue(partition).isTruncatingLog)
+
+    // To hit this case, we have to change the leader log without going through the truncation phase
+    leaderState.log.clear()
+    leaderState.logEndOffset = 0L
+    leaderState.logStartOffset = 0L
+    leaderState.highWatermark = 0L
+
+    fetcher.doWork()
   }
 
-  class DummyFetcherThread(name: String,
-                           clientId: String,
-                           sourceBroker: BrokerEndPoint,
-                           fetchBackOffMs: Int = 0)
-    extends AbstractFetcherThread(name, clientId, sourceBroker,
-      fetchBackOffMs,
-      isInterruptible = true,
-      includeLogTruncation = false) {
+  @Test
+  def testFollowerFetchOutOfRangeLow(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread
 
-    protected def getReplica(tp: TopicPartition): Option[Replica] = None
+    // The follower begins from an offset which is behind the leader's log start offset
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)))
 
-    override def processPartitionData(topicPartition: TopicPartition,
-                                      fetchOffset: Long,
-                                      partitionData: PD,
-                                      records: MemoryRecords): Unit = {}
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, highWatermark = 0L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 3L))
 
-    override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = 0L
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
 
-    override protected def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] =
-      fetchRequest.fetchData.asScala.mapValues(_ => new PartitionData[Records](Errors.NONE, 0, 0, 0,
-        Seq.empty.asJava, MemoryRecords.EMPTY)).toSeq
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, highWatermark = 2L)
+    fetcher.setLeaderState(partition, leaderState)
 
-    override protected def buildFetch(partitionMap: collection.Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
-      ResultWithPartitions(Some(fetchRequestBuilder(partitionMap)), Set())
-    }
+    // initial truncation and verify that the log start offset is updated
+    fetcher.doWork()
+    assertFalse(fetcher.partitionStates.stateValue(partition).isTruncatingLog)
+    assertEquals(2, replicaState.logStartOffset)
+    assertEquals(List(), replicaState.log.toList)
 
-    override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
+    TestUtils.waitUntilTrue(() => {
+      fetcher.doWork()
+      fetcher.replicaPartitionState(partition).log == fetcher.leaderPartitionState(partition).log
+    }, "Failed to reconcile leader and follower logs")
 
-    override def truncate(tp: TopicPartition, epochEndOffset: EpochEndOffset): OffsetTruncationState = {
-      OffsetTruncationState(epochEndOffset.endOffset, truncationCompleted = true)
-    }
+    assertEquals(leaderState.logStartOffset, replicaState.logStartOffset)
+    assertEquals(leaderState.logEndOffset, replicaState.logEndOffset)
+    assertEquals(leaderState.highWatermark, replicaState.highWatermark)
   }
 
   @Test
-  def testFetchRequestCorruptedMessageException() {
+  def testCorruptMessage(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcherThread = new CorruptingFetcherThread("test", "client", new BrokerEndPoint(0, "localhost", 9092),
-      fetchBackOffMs = 1)
 
-    fetcherThread.start()
+    val fetcher = new MockFetcherThread {
+      var fetchedOnce = false
+      override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
+        val fetchedData = super.fetchFromLeader(fetchRequest)
+        if (!fetchedOnce) {
+          val records = fetchedData.head._2.records.asInstanceOf[MemoryRecords]
+          val buffer = records.buffer()
+          buffer.putInt(15, buffer.getInt(15) ^ 23422)
+          buffer.putInt(30, buffer.getInt(30) ^ 93242)
+          fetchedOnce = true
+        }
+        fetchedData
+      }
+    }
+
+    fetcher.setReplicaState(partition, MockFetcherThread.PartitionState())
+    fetcher.addPartitions(Map(partition -> 0L))
 
-    // Add one partition for fetching
-    fetcherThread.addPartitions(Map(partition -> 0L))
+    val batch = mkBatch(baseOffset = 0L, leaderEpoch = 0,
+      new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+    val leaderState = MockFetcherThread.PartitionState(Seq(batch), highWatermark = 2L)
+    fetcher.setLeaderState(partition, leaderState)
 
-    // Wait until fetcherThread finishes the work
-    TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread to finish the work")
+    fetcher.doWork() // fails with corrupt record
+    fetcher.doWork() // should succeed
+
+    val replicaState = fetcher.replicaPartitionState(partition)
+    assertEquals(2L, replicaState.logEndOffset)
+  }
 
-    fetcherThread.shutdown()
+  object MockFetcherThread {
+    class PartitionState(var log: mutable.Buffer[RecordBatch],
+                         var logStartOffset: Long,
+                         var logEndOffset: Long,
+                         var highWatermark: Long)
+
+    object PartitionState {
+      def apply(log: Seq[RecordBatch], highWatermark: Long): PartitionState = {
+        val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L)
+        val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L)
+        new PartitionState(log.toBuffer, logStartOffset, logEndOffset, highWatermark)
+      }
 
-    // The fetcherThread should have fetched two normal messages
-    assertTrue(fetcherThread.logEndOffset == 2)
+      def apply(): PartitionState = {
+        apply(Seq(), 0L)
+      }
+    }
   }
 
-  class CorruptingFetcherThread(name: String,
-                                clientId: String,
-                                sourceBroker: BrokerEndPoint,
-                                fetchBackOffMs: Int = 0)
-    extends DummyFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
+  class MockFetcherThread(val replicaId: Int = 0,
+                          val leaderId: Int = 1,
+                          isUncleanLeaderElectionAllowed: Boolean = true)
+    extends AbstractFetcherThread("mock-fetcher",
+      clientId = "mock-fetcher",
+      sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) {
 
-    @volatile var logEndOffset = 0L
-    @volatile var fetchCount = 0
+    import MockFetcherThread.PartitionState
 
-    private val normalPartitionDataSet = List[PartitionData[Records]](
-      new PartitionData(Errors.NONE, 0L, 0L, 0L, Seq.empty.asJava,
-        MemoryRecords.withRecords(0L, CompressionType.NONE, new SimpleRecord("hello".getBytes))),
-      new PartitionData(Errors.NONE, 0L, 0L, 0L, Seq.empty.asJava,
-        MemoryRecords.withRecords(1L, CompressionType.NONE, new SimpleRecord("hello".getBytes)))
-    )
+    private val replicaPartitionStates = mutable.Map[TopicPartition, PartitionState]()
+    private val leaderPartitionStates = mutable.Map[TopicPartition, PartitionState]()
+
+    def setLeaderState(topicPartition: TopicPartition, state: PartitionState): Unit = {
+      leaderPartitionStates.put(topicPartition, state)
+    }
+
+    def setReplicaState(topicPartition: TopicPartition, state: PartitionState): Unit = {
+      replicaPartitionStates.put(topicPartition, state)
+    }
+
+    def replicaPartitionState(topicPartition: TopicPartition): PartitionState = {
+      replicaPartitionStates.getOrElse(topicPartition,
+        throw new IllegalArgumentException(s"Unknown partition $topicPartition"))
+    }
+
+    def leaderPartitionState(topicPartition: TopicPartition): PartitionState = {
+      leaderPartitionStates.getOrElse(topicPartition,
+        throw new IllegalArgumentException(s"Unknown partition $topicPartition"))
+    }
 
     override def processPartitionData(topicPartition: TopicPartition,
                                       fetchOffset: Long,
-                                      partitionData: PD,
-                                      records: MemoryRecords): Unit = {
+                                      partitionData: PD): Option[LogAppendInfo] = {
+      val state = replicaPartitionState(topicPartition)
+
       // Throw exception if the fetchOffset does not match the fetcherThread partition state
-      if (fetchOffset != logEndOffset)
-        throw new RuntimeException(
-          "Offset mismatch for partition %s: fetched offset = %d, log end offset = %d."
-            .format(topicPartition, fetchOffset, logEndOffset))
+      if (fetchOffset != state.logEndOffset)
+        throw new RuntimeException(s"Offset mismatch for partition $topicPartition: " +
+          s"fetched offset = $fetchOffset, log end offset = ${state.logEndOffset}.")
 
       // Now check message's crc
-      for (batch <- records.batches.asScala) {
+      val batches = partitionData.records.batches.asScala
+      var maxTimestamp = RecordBatch.NO_TIMESTAMP
+      var offsetOfMaxTimestamp = -1L
+      var lastOffset = state.logEndOffset
+
+      for (batch <- batches) {
         batch.ensureValid()
-        logEndOffset = batch.nextOffset
+        if (batch.maxTimestamp > maxTimestamp) {
+          maxTimestamp = batch.maxTimestamp
+          offsetOfMaxTimestamp = batch.baseOffset
+        }
+        state.log.append(batch)
+        state.logEndOffset = batch.nextOffset
+        lastOffset = batch.lastOffset
+      }
+
+      state.logStartOffset = partitionData.logStartOffset
+      state.highWatermark = partitionData.highWatermark
+
+      Some(LogAppendInfo(firstOffset = Some(fetchOffset),
+        lastOffset = lastOffset,
+        maxTimestamp = maxTimestamp,
+        offsetOfMaxTimestamp = offsetOfMaxTimestamp,
+        logAppendTime = Time.SYSTEM.milliseconds(),
+        logStartOffset = state.logStartOffset,
+        recordConversionStats = RecordConversionStats.EMPTY,
+        sourceCodec = NoCompressionCodec,
+        targetCodec = NoCompressionCodec,
+        shallowCount = batches.size,
+        validBytes = partitionData.records.sizeInBytes,
+        offsetsMonotonic = true,
+        lastOffsetOfFirstBatch = batches.headOption.map(_.lastOffset).getOrElse(-1)))
+    }
+
+    override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
+     val state = replicaPartitionState(topicPartition)
+     state.log = state.log.takeWhile { batch =>
+       batch.lastOffset < truncationState.offset
+     }
+     state.logEndOffset = state.log.lastOption.map(_.lastOffset + 1).getOrElse(state.logStartOffset)
+     state.highWatermark = math.min(state.highWatermark, state.logEndOffset)
+    }
+
+    override def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
+      val state = replicaPartitionState(topicPartition)
+      state.log.clear()
+      state.logStartOffset = offset
+      state.logEndOffset = offset
+      state.highWatermark = offset
+    }
+
+    override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
+      val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
+      partitionMap.foreach { case (partition, state) =>
+        if (state.isReadyForFetch) {
+          val replicaState = replicaPartitionState(partition)
+          fetchData.put(partition, new FetchRequest.PartitionData(state.fetchOffset, replicaState.logStartOffset, 1024 * 1024))
+        }
       }
+      val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
+      ResultWithPartitions(Some(fetchRequest), Set.empty)
     }
 
-    override protected def fetch(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
-      fetchCount += 1
-      // Set the first fetch to get a corrupted message
-      if (fetchCount == 1) {
-        val record = new SimpleRecord("hello".getBytes())
-        val records = MemoryRecords.withRecords(CompressionType.NONE, record)
-        val buffer = records.buffer
-
-        // flip some bits in the message to ensure the crc fails
-        buffer.putInt(15, buffer.getInt(15) ^ 23422)
-        buffer.putInt(30, buffer.getInt(30) ^ 93242)
-        fetchRequest.fetchData.asScala.mapValues(_ => new PartitionData[Records](Errors.NONE, 0L, 0L, 0L,
-          Seq.empty.asJava, records)).toSeq
-      } else {
-        // Then, the following fetches get the normal data
-        fetchRequest.fetchData.asScala.mapValues(v => normalPartitionDataSet(v.fetchOffset.toInt)).toSeq
+    override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = {
+      isUncleanLeaderElectionAllowed
+    }
+
+    override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
+      val state = replicaPartitionState(topicPartition)
+      state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))
+    }
+
+    override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset
+
+    override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
+      lookupEndOffsetForEpoch(epoch, replicaPartitionState(topicPartition))
+    }
+
+    private def lookupEndOffsetForEpoch(epoch: Int, partitionState: PartitionState): Option[OffsetAndEpoch] = {
+      var epochLowerBound = EpochEndOffset.UNDEFINED_EPOCH
+      for (batch <- partitionState.log) {
+        if (batch.partitionLeaderEpoch > epoch) {
+          return Some(OffsetAndEpoch(batch.baseOffset, epochLowerBound))
+        }
+        epochLowerBound = batch.partitionLeaderEpoch
       }
+      None
     }
 
-    override protected def buildFetch(partitionMap: collection.Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
-      val requestMap = new mutable.HashMap[TopicPartition, Long]
-      partitionMap.foreach { case (topicPartition, partitionFetchState) =>
-        // Add backoff delay check
-        if (partitionFetchState.isReadyForFetch)
-          requestMap.put(topicPartition, partitionFetchState.fetchOffset)
+    override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+      val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
+      partitions.foreach { case (partition, epoch) =>
+        val state = leaderPartitionState(partition)
+        val epochEndOffset = lookupEndOffsetForEpoch(epoch, state) match {
+          case Some(OffsetAndEpoch(offset, epoch)) =>
+            new EpochEndOffset(Errors.NONE, epoch, offset)
+          case None =>
+            new EpochEndOffset(Errors.NONE, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
+        }
+        endOffsets.put(partition, epochEndOffset)
       }
-      ResultWithPartitions(Some(fetchRequestBuilder(partitionMap)), Set())
+      endOffsets
     }
 
+    override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
+      fetchRequest.fetchData.asScala.map { case (partition, fetchData) =>
+        val state = leaderPartitionState(partition)
+        val (error, records) = if (fetchData.fetchOffset > state.logEndOffset || fetchData.fetchOffset < state.logStartOffset) {
+          (Errors.OFFSET_OUT_OF_RANGE, MemoryRecords.EMPTY)
+        } else {
+          // for simplicity, we fetch only one batch at a time
+          val records = state.log.find(_.baseOffset >= fetchData.fetchOffset) match {
+            case Some(batch) =>
+              val buffer = ByteBuffer.allocate(batch.sizeInBytes())
+              batch.writeTo(buffer)
+              buffer.flip()
+              MemoryRecords.readableRecords(buffer)
+
+            case None =>
+              MemoryRecords.EMPTY
+          }
+
+          (Errors.NONE, records)
+        }
+
+        (partition, new PD(error, state.highWatermark, state.highWatermark, state.logStartOffset,
+          List.empty.asJava, records))
+      }.toSeq
+    }
+
+    override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = {
+      leaderPartitionState(topicPartition).logStartOffset
+    }
+
+    override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition): Long = {
+      leaderPartitionState(topicPartition).logEndOffset
+    }
   }
 
 }


Mime
View raw message