kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)
Date Thu, 04 Oct 2018 21:03:03 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2dd6aa  KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)
f2dd6aa is described below

commit f2dd6aa2698345fd0b0348f7bc74ce3215adf682
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Oct 4 14:02:23 2018 -0700

    KAFKA-7415; Persist leader epoch and start offset on becoming a leader (#5678)
    
    This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections.
    
    Additionally, we have made the following changes:
    
    1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache.
    2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 core/src/main/scala/kafka/cluster/Replica.scala    |   4 +-
 core/src/main/scala/kafka/log/Log.scala            |  24 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |   4 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  | 197 +++++----
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  33 ++
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  41 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |   4 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  28 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  22 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   6 +-
 .../LeaderEpochCheckpointFileTest.scala            |   1 -
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  24 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala    | 467 ++++++++-------------
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  49 ++-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   5 +-
 17 files changed, 444 insertions(+), 478 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2036bb0..307fb81 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -301,8 +301,17 @@ class Partition(val topic: String,
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
-      val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)
 
+      // In the case of successive leader elections in a short time period, a follower may have
+      // entries in its log from a later epoch than any entry in the new leader's log. In order
+      // to ensure that these followers can truncate to the right offset, we must cache the new
+      // leader epoch and the start offset since it should be larger than any epoch that a follower
+      // would try to query.
+      leaderReplica.epochs.foreach { epochCache =>
+        epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+      }
+
+      val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index d729dad..22860c7 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,7 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -55,7 +55,7 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
-  val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
+  val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
 
   info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c9b877b..699d3d1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -229,7 +229,7 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
-  @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
+  @volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache()
 
   locally {
     val startMs = time.milliseconds
@@ -239,12 +239,12 @@ class Log(@volatile var dir: File,
     /* Calculate the offset of the next message */
     nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
 
-    _leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
+    _leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)
 
     logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
 
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
-    _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+    _leaderEpochCache.truncateFromStart(logStartOffset)
 
     // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
     // from scratch.
@@ -296,11 +296,11 @@ class Log(@volatile var dir: File,
 
   def leaderEpochCache = _leaderEpochCache
 
-  private def initializeLeaderEpochCache(): LeaderEpochCache = {
+  private def initializeLeaderEpochCache(): LeaderEpochFileCache = {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
-    new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
-      new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
+    val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)
+    new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
   }
 
   /**
@@ -422,7 +422,7 @@ class Log(@volatile var dir: File,
    * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
    */
   private def recoverSegment(segment: LogSegment,
-                             leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
+                             leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized {
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)
     val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
@@ -941,7 +941,7 @@ class Log(@volatile var dir: File,
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
-          _leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+          _leaderEpochCache.truncateFromStart(logStartOffset)
           producerStateManager.truncateHead(logStartOffset)
           updateFirstUnstableOffset()
         }
@@ -1650,7 +1650,7 @@ class Log(@volatile var dir: File,
             updateLogEndOffset(targetOffset)
             this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
             this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-            _leaderEpochCache.clearAndFlushLatest(targetOffset)
+            _leaderEpochCache.truncateFromEnd(targetOffset)
             loadProducerState(targetOffset, reloadFromCleanShutdown = false)
           }
           true
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0c00e55..80763a8 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -330,7 +330,7 @@ class LogSegment private[log] (val log: FileRecords,
    * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
    */
   @nonthreadsafe
-  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
     offsetIndex.reset()
     timeIndex.reset()
     txnIndex.reset()
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 88f5d6b..cee6bb6 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -18,53 +18,69 @@ package kafka.server.epoch
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.server.LogOffsetMetadata
 import kafka.server.checkpoints.LeaderEpochCheckpoint
 import org.apache.kafka.common.requests.EpochEndOffset._
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import scala.collection.mutable.ListBuffer
 
-trait LeaderEpochCache {
-  def assign(leaderEpoch: Int, offset: Long)
-  def latestEpoch: Int
-  def endOffsetFor(epoch: Int): (Int, Long)
-  def clearAndFlushLatest(offset: Long)
-  def clearAndFlushEarliest(offset: Long)
-  def clearAndFlush()
-  def clear()
-}
+import scala.collection.mutable.ListBuffer
 
 /**
-  * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
-  *
-  * Leader Epoch = epoch assigned to each leader by the controller.
-  * Offset = offset of the first message in each epoch.
-  *
-  * @param leo a function that determines the log end offset
-  * @param checkpoint the checkpoint file
-  */
-class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
+ *
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ *
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ * @param logEndOffset function to fetch the current log end offset
+ */
+class LeaderEpochFileCache(topicPartition: TopicPartition,
+                           logEndOffset: () => Long,
+                           checkpoint: LeaderEpochCheckpoint) extends Logging {
+  this.logIdent = s"[LeaderEpochCache $topicPartition] "
+
   private val lock = new ReentrantReadWriteLock()
   private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
 
   /**
     * Assigns the supplied Leader Epoch to the supplied Offset
     * Once the epoch is assigned it cannot be reassigned
-    *
-    * @param epoch
-    * @param offset
     */
-  override def assign(epoch: Int, offset: Long): Unit = {
+  def assign(epoch: Int, startOffset: Long): Unit = {
     inWriteLock(lock) {
-      if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) {
-        info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.")
-        epochs += EpochEntry(epoch, offset)
-        flush()
+      val updateNeeded = if (epochs.isEmpty) {
+        true
       } else {
-        validateAndMaybeWarn(epoch, offset)
+        val lastEntry = epochs.last
+        lastEntry.epoch != epoch || startOffset < lastEntry.startOffset
       }
+
+      if (updateNeeded) {
+        truncateAndAppend(EpochEntry(epoch, startOffset))
+        flush()
+      }
+    }
+  }
+
+  /**
+   * Remove any entries which violate monotonicity following the insertion of an assigned epoch.
+   */
+  private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
+    validateAndMaybeWarn(entryToAppend)
+
+    val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
+      entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
+    }
+
+    epochs = retainedEpochs :+ entryToAppend
+
+    if (removedEpochs.isEmpty) {
+      debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
+    } else {
+      warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
+        s"Cache now contains ${epochs.size} entries.")
     }
   }
 
@@ -74,7 +90,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * @return
     */
-  override def latestEpoch(): Int = {
+  def latestEpoch: Int = {
     inReadLock(lock) {
       if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
     }
@@ -93,45 +109,59 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     * so that the follower falls back to High Water Mark.
     *
     * @param requestedEpoch requested leader epoch
-    * @return leader epoch and offset
+    * @return found leader epoch and end offset
     */
-  override def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
+  def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
     inReadLock(lock) {
       val epochAndOffset =
         if (requestedEpoch == UNDEFINED_EPOCH) {
-          // this may happen if a bootstrapping follower sends a request with undefined epoch or
+          // This may happen if a bootstrapping follower sends a request with undefined epoch or
           // a follower is on the older message format where leader epochs are not recorded
           (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
         } else if (requestedEpoch == latestEpoch) {
-          (requestedEpoch, leo().messageOffset)
+          // For the leader, the latest epoch is always the current leader epoch that is still being written to.
+          // Followers should not have any reason to query for the end offset of the current epoch, but a consumer
+          // might if it is verifying its committed offset following a group rebalance. In this case, we return
+          // the current log end offset which makes the truncation check work as expected.
+          (requestedEpoch, logEndOffset())
         } else {
           val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch}
-          if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
-            // no epochs recorded or requested epoch < the first epoch cached
+          if (subsequentEpochs.isEmpty) {
+            // The requested epoch is larger than any known epoch. This case should never be hit because
+            // the latest cached epoch is always the largest.
             (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
-          else {
-            // we must get at least one element in previous epochs list, because if we are here,
-            // it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is
+          } else if (previousEpochs.isEmpty) {
+            // The requested epoch is smaller than any known epoch, so we return the start offset of the first
+            // known epoch which is larger than it. This may be inaccurate as there could have been
+            // epochs in between, but the point is that the data has already been removed from the log
+            // and we want to ensure that the follower can replicate correctly beginning from the leader's
+            // start offset.
+            (requestedEpoch, subsequentEpochs.head.startOffset)
+          } else {
+            // We have at least one previous epoch and one subsequent epoch. The result is the first
+            // prior epoch and the starting offset of the first subsequent epoch.
             (previousEpochs.last.epoch, subsequentEpochs.head.startOffset)
           }
         }
-      debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning epoch ${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size ${epochs.size}")
+      debug(s"Processed end offset request for epoch $requestedEpoch and returning epoch ${epochAndOffset._1} " +
+        s"with end offset ${epochAndOffset._2} from epoch cache of size ${epochs.size}")
       epochAndOffset
     }
   }
 
   /**
     * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
-    *
-    * @param offset
     */
-  override def clearAndFlushLatest(offset: Long): Unit = {
+  def truncateFromEnd(endOffset: Long): Unit = {
     inWriteLock(lock) {
-      val before = epochs
-      if (offset >= 0 && offset <= latestOffset()) {
-        epochs = epochs.filter(entry => entry.startOffset < offset)
+      if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) {
+        val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset)
+        epochs = previousEntries
+
         flush()
-        info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+
+        debug(s"Cleared entries $subsequentEntries from epoch cache after " +
+          s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.")
       }
     }
   }
@@ -142,20 +172,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
     *
     * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
     *
-    * @param offset the offset to clear up to
+    * @param startOffset the offset to clear up to
     */
-  override def clearAndFlushEarliest(offset: Long): Unit = {
+  def truncateFromStart(startOffset: Long): Unit = {
     inWriteLock(lock) {
-      val before = epochs
-      if (offset >= 0 && earliestOffset() < offset) {
-        val earliest = epochs.filter(entry => entry.startOffset < offset)
-        if (earliest.nonEmpty) {
-          epochs = epochs --= earliest
-          //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset
-          if (offset < earliestOffset() || epochs.isEmpty)
-            new EpochEntry(earliest.last.epoch, offset) +=: epochs
+      if (epochs.nonEmpty) {
+        val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset)
+
+        previousEntries.lastOption.foreach { firstBeforeStartOffset =>
+          val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset)
+          epochs = updatedFirstEntry +: subsequentEntries
+
           flush()
-          info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition")
+
+          debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " +
+            s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.")
         }
       }
     }
@@ -164,47 +195,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
   /**
     * Delete all entries.
     */
-  override def clearAndFlush() = {
+  def clearAndFlush() = {
     inWriteLock(lock) {
       epochs.clear()
       flush()
     }
   }
 
-  override def clear() = {
+  def clear() = {
     inWriteLock(lock) {
       epochs.clear()
     }
   }
 
-  def epochEntries(): ListBuffer[EpochEntry] = {
+  // Visible for testing
+  def epochEntries: ListBuffer[EpochEntry] = {
     epochs
   }
 
-  private def earliestOffset(): Long = {
-    if (epochs.isEmpty) -1 else epochs.head.startOffset
-  }
-
-  private def latestOffset(): Long = {
-    if (epochs.isEmpty) -1 else epochs.last.startOffset
-  }
+  private def latestEntry: Option[EpochEntry] = epochs.lastOption
 
   private def flush(): Unit = {
     checkpoint.write(epochs)
   }
 
-  def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset:$latestOffset} for Partition: $topicPartition"
-
-  def validateAndMaybeWarn(epoch: Int, offset: Long) = {
-    assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}")
-    if (epoch < latestEpoch())
-      warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " +
-        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
-    else if (offset < latestOffset())
-      warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " +
-        s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}")
+  private def validateAndMaybeWarn(entry: EpochEntry) = {
+    if (entry.epoch < 0) {
+      throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
+    } else {
+      // If the latest append violates the monotonicity of epochs or starting offsets, our choices
+      // are either to raise an error, ignore the append, or allow the append and truncate the
+      // conflicting entries from the cache. Raising an error risks killing the fetcher threads in
+      // pathological cases (i.e. cases we are not yet aware of). We instead take the final approach
+      // and assume that the latest append is always accurate.
+
+      latestEntry.foreach { latest =>
+        if (entry.epoch < latest.epoch)
+          warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " +
+            s"of the latest entry $latest. This implies messages have arrived out of order.")
+        else if (entry.startOffset < latest.startOffset)
+          warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " +
+            s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.")
+      }
+    }
   }
 }
 
 // Mapping of epoch to the first offset of the subsequent epoch
-case class EpochEntry(epoch: Int, startOffset: Long)
+case class EpochEntry(epoch: Int, startOffset: Long) {
+  override def toString: String = {
+    s"EpochEntry(epoch=$epoch, startOffset=$startOffset)"
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 343693e..7cdc778 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -98,6 +98,39 @@ class PartitionTest {
   }
 
   @Test
+  def testMakeLeaderUpdatesEpochCache(): Unit = {
+    val controllerEpoch = 3
+    val leader = brokerId
+    val follower = brokerId + 1
+    val controllerId = brokerId + 3
+    val replicas = List[Integer](leader, follower).asJava
+    val isr = List[Integer](leader, follower).asJava
+    val leaderEpoch = 8
+
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k2".getBytes, "v2".getBytes)
+    ), leaderEpoch = 0)
+    log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5,
+      new SimpleRecord("k3".getBytes, "v3".getBytes),
+      new SimpleRecord("k4".getBytes, "v4".getBytes)
+    ), leaderEpoch = 5)
+    assertEquals(4, log.logEndOffset)
+
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+    assertTrue("Expected makeLeader to succeed",
+      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch,
+        isr, 1, replicas, true), 0))
+
+    assertEquals(Some(4), partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
+
+    val epochEndOffset = partition.lastOffsetForLeaderEpoch(leaderEpoch)
+    assertEquals(4, epochEndOffset.endOffset)
+    assertEquals(leaderEpoch, epochEndOffset.leaderEpoch)
+  }
+
+  @Test
   // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
   def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
     val latch = new CountDownLatch(1)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1381bc6..f8d76b6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -25,7 +25,7 @@ import java.util.Properties
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
-import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -287,7 +287,7 @@ class LogTest {
             }
 
             override def recover(producerStateManager: ProducerStateManager,
-                                 leaderEpochCache: Option[LeaderEpochCache]): Int = {
+                                 leaderEpochCache: Option[LeaderEpochFileCache]): Int = {
               recoveredSegments += this
               super.recover(producerStateManager, leaderEpochCache)
             }
@@ -2589,8 +2589,8 @@ class LogTest {
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
-    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries().head)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
+    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -2599,7 +2599,7 @@ class LogTest {
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
-    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
   }
 
   @Test
@@ -2612,12 +2612,12 @@ class LogTest {
     log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size)
 
     log.close()
     log.delete()
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
-    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size)
   }
 
   @Test
@@ -2816,7 +2816,7 @@ class LogTest {
     for (i <- records.indices)
       log.appendAsFollower(recordsForEpoch(i))
 
-    assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch())
+    assertEquals(42, log.leaderEpochCache.latestEpoch)
   }
 
   @Test
@@ -2871,19 +2871,24 @@ class LogTest {
 
   @Test
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
-    val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
+    def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
+      TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
+        baseOffset = startOffset, partitionLeaderEpoch = epoch)
+    }
+
+    val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes)
     val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
-    //Given 2 segments, 10 messages per segment
-    for (epoch <- 1 to 20)
-      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    def append(epoch: Int, startOffset: Long, count: Int): Unit = {
+      for (i <- 0 until count)
+        log.appendAsFollower(createRecords(startOffset + i, epoch))
+    }
 
-    //Simulate some leader changes at specific offsets
-    cache.assign(0, 0)
-    cache.assign(1, 10)
-    cache.assign(2, 16)
+    //Given 2 segments, 10 messages per segment
+    append(epoch = 0, startOffset = 0, count = 10)
+    append(epoch = 1, startOffset = 10, count = 6)
+    append(epoch = 2, startOffset = 16, count = 4)
 
     assertEquals(2, log.numberOfSegments)
     assertEquals(20, log.logEndOffset)
@@ -2935,7 +2940,7 @@ class LogTest {
     assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
 
     // deliberately remove some of the epoch entries
-    leaderEpochCache.clearAndFlushLatest(2)
+    leaderEpochCache.truncateFromEnd(2)
     assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
     log.close()
 
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index c90a5b9..3dff709 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
@@ -253,7 +253,7 @@ class IsrExpirationTest {
 
   private def logMock: Log = {
     val log = EasyMock.createMock(classOf[kafka.log.Log])
-    val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
+    val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 8fb5ab6..2e28ee1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -21,7 +21,7 @@ import kafka.api.Request
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.LogManager
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.{DelayedItem, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{KafkaStorageException, ReplicaNotAvailableException}
@@ -46,7 +46,7 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
     val partition = createMock(classOf[Partition])
@@ -87,7 +87,7 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val replica = createNiceMock(classOf[Replica])
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[ReplicaManager])
@@ -133,9 +133,9 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochsT1p0 = createMock(classOf[LeaderEpochCache])
-    val leaderEpochsT1p1 = createMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
+    val leaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaT1p0 = createNiceMock(classOf[Replica])
     val replicaT1p1 = createNiceMock(classOf[Replica])
@@ -195,8 +195,8 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values for both future replicas
@@ -265,8 +265,8 @@ class ReplicaAlterLogDirsThreadTest {
     val logManager = createMock(classOf[LogManager])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val partition = createMock(classOf[Partition])
     val replicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -319,8 +319,8 @@ class ReplicaAlterLogDirsThreadTest {
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[kafka.log.LogManager])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
@@ -401,8 +401,8 @@ class ReplicaAlterLogDirsThreadTest {
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replica = createNiceMock(classOf[Replica])
     val futureReplica = createNiceMock(classOf[Replica])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 520801c..9440c29 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -20,7 +20,7 @@ import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogManager
 import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.ClientResponse
@@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -278,7 +278,7 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -339,7 +339,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -388,7 +388,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -442,7 +442,7 @@ class ReplicaFetcherThreadTest {
 
     // Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -513,7 +513,7 @@ class ReplicaFetcherThreadTest {
 
     // Setup all dependencies
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -574,7 +574,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -619,7 +619,7 @@ class ReplicaFetcherThreadTest {
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
     val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createMock(classOf[kafka.log.LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -677,7 +677,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
@@ -728,7 +728,7 @@ class ReplicaFetcherThreadTest {
 
     //Setup all stubs
     val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
     val replica = createNiceMock(classOf[Replica])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0844052..90d488d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
 import kafka.zk.KafkaZkClient
@@ -641,7 +641,7 @@ class ReplicaManagerTest {
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
-    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
+    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn((leaderEpochFromLeader, localLogOffset))
@@ -661,7 +661,7 @@ class ReplicaManagerTest {
         new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 30000),
       logDirFailureChannel = mockLogDirFailureChannel) {
 
-      override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache
+      override def leaderEpochCache: LeaderEpochFileCache = mockLeaderEpochCache
 
       override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
     }
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
index e7c6a97..0c47f15 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
@@ -24,7 +24,6 @@ import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 
-
 class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 4859019..5c37891 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -90,23 +90,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     assertEquals(0, latestRecord(follower).partitionLeaderEpoch())
 
     //Both leader and follower should have recorded Epoch 0 at Offset 0
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
 
     //Bounce the follower
     bounce(follower)
     awaitISR(tp)
 
     //Nothing happens yet as we haven't sent any new messages.
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
 
     //Send a message
     producer.send(new ProducerRecord(topic, 0, null, msg)).get
 
     //Epoch1 should now propagate to the follower with the written message
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
 
     //The new message should have epoch 1 stamped
     assertEquals(1, latestRecord(leader).partitionLeaderEpoch())
@@ -117,8 +117,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     awaitISR(tp)
 
     //Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication.
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
 
     //Send a message
     producer.send(new ProducerRecord(topic, 0, null, msg)).get
@@ -128,8 +128,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     assertEquals(2, latestRecord(follower).partitionLeaderEpoch())
 
     //The leader epoch files should now match on leader and follower
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries())
-    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries)
   }
 
   @Test
@@ -377,8 +377,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
     info(s"Bounce complete for follower ${follower.config.brokerId}")
-    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
-    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries())
+    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries)
+    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries)
   }
 
   private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index d1f9390..7ac606a 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -16,6 +16,7 @@
   */
 
 package kafka.server.epoch
+
 import java.io.File
 
 import kafka.server.LogOffsetMetadata
@@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
-import org.junit.{Before, Test}
+import org.junit.Test
 
 import scala.collection.mutable.ListBuffer
 
@@ -33,54 +34,44 @@ import scala.collection.mutable.ListBuffer
   */
 class LeaderEpochFileCacheTest {
   val tp = new TopicPartition("TestTopic", 5)
-  var checkpoint: LeaderEpochCheckpoint = _
+  private var logEndOffset = 0L
+  private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+    private var epochs: Seq[EpochEntry] = Seq()
+    override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
+    override def read(): Seq[EpochEntry] = this.epochs
+  }
+  private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
 
   @Test
   def shouldAddEpochAndMessageOffsetToCache() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 10)
-    leo = 11
+    cache.assign(epoch = 2, startOffset = 10)
+    logEndOffset = 11
 
     //Then
-    assertEquals(2, cache.latestEpoch())
-    assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
-    assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo
+    assertEquals(2, cache.latestEpoch)
+    assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2)) //should match logEndOffset
   }
 
   @Test
   def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When just one epoch
-    cache.assign(epoch = 2, offset = 11)
-    cache.assign(epoch = 2, offset = 12)
-    leo = 14
+    cache.assign(epoch = 2, startOffset = 11)
+    cache.assign(epoch = 2, startOffset = 12)
+    logEndOffset = 14
 
     //Then
-    assertEquals((2, leo), cache.endOffsetFor(2))
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2))
   }
 
   @Test
   def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
-    def leoFinder() = new LogOffsetMetadata(0)
     val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
-    //Given cache with some data on leader
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     // assign couple of epochs
-    cache.assign(epoch = 2, offset = 11)
-    cache.assign(epoch = 3, offset = 12)
+    cache.assign(epoch = 2, startOffset = 11)
+    cache.assign(epoch = 3, startOffset = 12)
 
     //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
     val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@@ -92,68 +83,51 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
     //Given
-    leo = 9
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 9
 
-    cache.assign(2, leo)
+    cache.assign(2, logEndOffset)
 
     //When called again later
     cache.assign(2, 10)
 
     //Then the offset should NOT have been updated
-    assertEquals(leo, cache.epochEntries()(0).startOffset)
+    assertEquals(logEndOffset, cache.epochEntries(0).startOffset)
+    assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries)
   }
 
   @Test
-  def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
+  def shouldEnforceMonotonicallyIncreasingStartOffsets() = {
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
     cache.assign(2, 9)
 
     //When update epoch new epoch but same offset
     cache.assign(3, 9)
 
     //Then epoch should have been updated
-    assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries())
+    assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries)
   }
   
   @Test
   def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = {
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint)
     cache.assign(2, 6)
 
     //When called again later with a greater offset
     cache.assign(2, 10)
 
     //Then later update should have been ignored
-    assertEquals(6, cache.epochEntries()(0).startOffset)
+    assertEquals(6, cache.epochEntries(0).startOffset)
   }
 
   @Test
   def shouldReturnUnsupportedIfNoEpochRecorded(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0))
   }
 
   @Test
   def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){
-    val leo = 73
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 73
 
     //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
     val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
@@ -164,39 +138,41 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
-  def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
-    cache.assign(epoch = 5, offset = 11)
-    cache.assign(epoch = 6, offset = 12)
-    cache.assign(epoch = 7, offset = 13)
+  def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(){
+    cache.assign(epoch = 5, startOffset = 11)
+    cache.assign(epoch = 6, startOffset = 12)
+    cache.assign(epoch = 7, startOffset = 13)
 
     //When
-    val epochAndOffset = cache.endOffsetFor(5 - 1)
+    val epochAndOffset = cache.endOffsetFor(4)
 
     //Then
-    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset)
+    assertEquals((4, 11), epochAndOffset)
   }
 
   @Test
-  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
+  def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = {
+    cache.assign(epoch = 5, startOffset = 11)
+    cache.assign(epoch = 6, startOffset = 12)
+    cache.assign(epoch = 7, startOffset = 13)
 
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    // epoch 7 starts at an earlier offset
+    cache.assign(epoch = 7, startOffset = 12)
 
+    assertEquals((5, 12), cache.endOffsetFor(5))
+    assertEquals((5, 12), cache.endOffsetFor(6))
+  }
+
+  @Test
+  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
     //When several epochs
-    cache.assign(epoch = 1, offset = 11)
-    cache.assign(epoch = 1, offset = 12)
-    cache.assign(epoch = 2, offset = 13)
-    cache.assign(epoch = 2, offset = 14)
-    cache.assign(epoch = 3, offset = 15)
-    cache.assign(epoch = 3, offset = 16)
-    leo = 17
+    cache.assign(epoch = 1, startOffset = 11)
+    cache.assign(epoch = 1, startOffset = 12)
+    cache.assign(epoch = 2, startOffset = 13)
+    cache.assign(epoch = 2, startOffset = 14)
+    cache.assign(epoch = 3, startOffset = 15)
+    cache.assign(epoch = 3, startOffset = 16)
+    logEndOffset = 17
 
     //Then get the start offset of the next epoch
     assertEquals((2, 15), cache.endOffsetFor(2))
@@ -204,15 +180,10 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 0, offset = 10)
-    cache.assign(epoch = 2, offset = 13)
-    cache.assign(epoch = 4, offset = 17)
+    cache.assign(epoch = 0, startOffset = 10)
+    cache.assign(epoch = 2, startOffset = 13)
+    cache.assign(epoch = 4, startOffset = 17)
 
     //Then
     assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1))
@@ -222,14 +193,9 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 2, offset = 7)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 2, startOffset = 7)
 
     //Then
     assertEquals(1, cache.epochEntries.size)
@@ -238,14 +204,10 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
-    val leo = 100
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    logEndOffset = 100
 
     //When
-    cache.assign(epoch = 2, offset = 100)
+    cache.assign(epoch = 2, startOffset = 100)
 
     //Then
     assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3))
@@ -253,35 +215,28 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When
-    cache.assign(epoch = 2, offset = 6)
-    leo = 7
+    cache.assign(epoch = 2, startOffset = 6)
+    logEndOffset = 7
 
     //Then
-    assertEquals((2, leo), cache.endOffsetFor(2))
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2))
     assertEquals(1, cache.epochEntries.size)
-    assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
+    assertEquals(EpochEntry(2, 6), cache.epochEntries(0))
   }
 
   @Test
   def shouldPersistEpochsBetweenInstances(){
-    def leoFinder() = new LogOffsetMetadata(0)
     val checkpointPath = TestUtils.tempFile().getAbsolutePath
-    checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
+    val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
 
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
+    val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+    cache.assign(epoch = 2, startOffset = 6)
 
     //When
     val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
-    val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2)
+    val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2)
 
     //Then
     assertEquals(1, cache2.epochEntries.size)
@@ -289,81 +244,68 @@ class LeaderEpochFileCacheTest {
   }
 
   @Test
-  def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
+  def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
     //Given
-    cache.assign(epoch = 1, offset = 5); leo = 6
-    cache.assign(epoch = 2, offset = 6); leo = 7
-
-    //When we update an epoch in the past with an earlier offset
-    cache.assign(epoch = 1, offset = 7); leo = 8
+    cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
+    cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
 
-    //Then epoch should not be changed
-    assertEquals(2, cache.latestEpoch())
+    //When we update an epoch in the past with a different offset, the log has already reached
+    //an inconsistent state. Our options are either to raise an error, ignore the new append,
+    //or truncate the cached epochs to the point of conflict. We take this latter approach in
+    //order to guarantee that epochs and offsets in the cache increase monotonically, which makes
+    //the search logic simpler to reason about.
+    cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8
 
-    //Then end offset for epoch 1 shouldn't have changed
-    assertEquals((1, 6), cache.endOffsetFor(1))
+    //Then later epochs will be removed
+    assertEquals(1, cache.latestEpoch)
 
-    //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't think of a better option)
-    assertEquals((2, 8), cache.endOffsetFor(2))
+    //Then end offset for epoch 1 will have changed
+    assertEquals((1, 8), cache.endOffsetFor(1))
 
-    //Epoch history shouldn't have changed
-    assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
-    assertEquals(EpochEntry(2, 6), cache.epochEntries()(1))
+    //Then end offset for epoch 2 is now undefined
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2))
+    assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
   }
 
   @Test
-  def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
+  def shouldEnforceOffsetsIncreaseMonotonically() = {
     //When epoch goes forward but offset goes backwards
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 5)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 5)
 
-    //Then latter assign should be ignored
-    assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
+    //The last assignment wins and the conflicting one is removed from the log
+    assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0))
   }
 
   @Test
   def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 0, offset = 0) //leo=0
+    cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0
 
     //When
-    cache.assign(epoch = 1, offset = 0) //leo=0
+    cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0
 
     //Then epoch should go up
-    assertEquals(1, cache.latestEpoch())
+    assertEquals(1, cache.latestEpoch)
     //offset for 1 should still be 0
     assertEquals((1, 0), cache.endOffsetFor(1))
     //offset for epoch 0 should still be 0
     assertEquals((0, 0), cache.endOffsetFor(0))
 
     //When we write 5 messages as epoch 1
-    leo = 5
+    logEndOffset = 5
 
-    //Then end offset for epoch(1) should be leo => 5
+    //Then end offset for epoch(1) should be logEndOffset => 5
     assertEquals((1, 5), cache.endOffsetFor(1))
     //Epoch 0 should still be at offset 0
     assertEquals((0, 0), cache.endOffsetFor(0))
 
     //When
-    cache.assign(epoch = 2, offset = 5) //leo=5
+    cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5
 
-    leo = 10 //write another 5 messages
+    logEndOffset = 10 //write another 5 messages
 
-    //Then end offset for epoch(2) should be leo => 10
+    //Then end offset for epoch(2) should be logEndOffset => 10
     assertEquals((2, 10), cache.endOffsetFor(2))
 
     //end offset for epoch(1) should be the start offset of epoch(2) => 5
@@ -375,36 +317,30 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //When new
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //When Messages come in
-    cache.assign(epoch = 0, offset = 0); leo = 1
-    cache.assign(epoch = 0, offset = 1); leo = 2
-    cache.assign(epoch = 0, offset = 2); leo = 3
+    cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1
+    cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2
+    cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3
 
     //Then epoch should stay, offsets should grow
-    assertEquals(0, cache.latestEpoch())
-    assertEquals((0, leo), cache.endOffsetFor(0))
+    assertEquals(0, cache.latestEpoch)
+    assertEquals((0, logEndOffset), cache.endOffsetFor(0))
 
     //When messages arrive with greater epoch
-    cache.assign(epoch = 1, offset = 3); leo = 4
-    cache.assign(epoch = 1, offset = 4); leo = 5
-    cache.assign(epoch = 1, offset = 5); leo = 6
+    cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4
+    cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5
+    cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6
 
-    assertEquals(1, cache.latestEpoch())
-    assertEquals((1, leo), cache.endOffsetFor(1))
+    assertEquals(1, cache.latestEpoch)
+    assertEquals((1, logEndOffset), cache.endOffsetFor(1))
 
     //When
-    cache.assign(epoch = 2, offset = 6); leo = 7
-    cache.assign(epoch = 2, offset = 7); leo = 8
-    cache.assign(epoch = 2, offset = 8); leo = 9
+    cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7
+    cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8
+    cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9
 
-    assertEquals(2, cache.latestEpoch())
-    assertEquals((2, leo), cache.endOffsetFor(2))
+    assertEquals(2, cache.latestEpoch)
+    assertEquals((2, logEndOffset), cache.endOffsetFor(2))
 
     //Older epochs should return the start offset of the first message in the subsequent epoch.
     assertEquals((0, 3), cache.endOffsetFor(0))
@@ -413,16 +349,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When clear latest on epoch boundary
-    cache.clearAndFlushLatest(offset = 8)
+    cache.truncateFromEnd(endOffset = 8)
 
     //Then should remove two latest epochs (remove is inclusive)
     assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
@@ -430,16 +363,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset ON epoch boundary
-    cache.clearAndFlushEarliest(offset = 8)
+    cache.truncateFromStart(startOffset = 8)
 
     //Then should preserve (3, 8)
     assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -447,16 +377,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearAndFlushEarliest(offset = 9)
+    cache.truncateFromStart(startOffset = 9)
 
     //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -464,16 +391,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotClearAnythingIfOffsetToEarly(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset before first epoch offset
-    cache.clearAndFlushEarliest(offset = 1)
+    cache.truncateFromStart(startOffset = 1)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -481,16 +405,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on earliest epoch boundary
-    cache.clearAndFlushEarliest(offset = 6)
+    cache.truncateFromStart(startOffset = 6)
 
     //Then nothing should change
     assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
@@ -498,16 +419,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldRetainLatestEpochOnClearAllEarliest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When
-    cache.clearAndFlushEarliest(offset = 11)
+    cache.truncateFromStart(startOffset = 11)
 
     //Then retain the last
     assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
@@ -515,16 +433,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When we clear from a postition between offset 8 & offset 11
-    cache.clearAndFlushEarliest(offset = 9)
+    cache.truncateFromStart(startOffset = 9)
 
     //Then we should update the middle epoch entry's offset
     assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
@@ -532,16 +447,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 0, offset = 0)
-    cache.assign(epoch = 1, offset = 7)
-    cache.assign(epoch = 2, offset = 10)
+    cache.assign(epoch = 0, startOffset = 0)
+    cache.assign(epoch = 1, startOffset = 7)
+    cache.assign(epoch = 2, startOffset = 10)
 
     //When we clear from a postition between offset 0 & offset 7
-    cache.clearAndFlushEarliest(offset = 5)
+    cache.truncateFromStart(startOffset = 5)
 
     //Then we should keeep epoch 0 but update the offset appropriately
     assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
@@ -549,16 +461,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset beyond last epoch
-    cache.clearAndFlushEarliest(offset = 15)
+    cache.truncateFromStart(startOffset = 15)
 
     //Then update the last
     assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
@@ -566,51 +475,42 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset BETWEEN epoch boundaries
-    cache.clearAndFlushLatest(offset = 9)
+    cache.truncateFromEnd(endOffset = 9)
 
     //Then should keep the preceding epochs
-    assertEquals(3, cache.latestEpoch())
+    assertEquals(3, cache.latestEpoch)
     assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
   }
 
   @Test
   def shouldClearAllEntries(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
-    //When 
+    //When
     cache.clearAndFlush()
 
-    //Then 
+    //Then
     assertEquals(0, cache.epochEntries.size)
   }
 
   @Test
   def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -618,16 +518,13 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = {
-    def leoFinder() = new LogOffsetMetadata(0)
-
     //Given
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.assign(epoch = 2, offset = 6)
-    cache.assign(epoch = 3, offset = 8)
-    cache.assign(epoch = 4, offset = 11)
+    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(epoch = 3, startOffset = 8)
+    cache.assign(epoch = 4, startOffset = 11)
 
     //When reset to offset on epoch boundary
-    cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET)
+    cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
 
     //Then should do nothing
     assertEquals(3, cache.epochEntries.size)
@@ -635,54 +532,26 @@ class LeaderEpochFileCacheTest {
 
   @Test
   def shouldFetchLatestEpochOfEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals(-1, cache.latestEpoch)
   }
 
   @Test
   def shouldFetchEndOffsetOfEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
     assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7))
   }
 
   @Test
   def shouldClearEarliestOnEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
-    cache.clearAndFlushEarliest(7)
+    cache.truncateFromStart(7)
   }
 
   @Test
   def shouldClearLatestOnEmptyCache(): Unit = {
-    //Given
-    def leoFinder() = new LogOffsetMetadata(0)
-
-    //When
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
     //Then
-    cache.clearAndFlushLatest(7)
+    cache.truncateFromEnd(7)
   }
 
-  @Before
-  def setUp() {
-    checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 5ad641f..efc0717 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -16,7 +16,7 @@
   */
 package kafka.server.epoch
 
-import java.util.{Optional, Map => JMap}
+import java.util.Optional
 
 import kafka.server.KafkaConfig._
 import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
@@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
+import scala.collection.mutable.ListBuffer
 
 class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
-  var brokers: Seq[KafkaServer] = null
+  var brokers: ListBuffer[KafkaServer] = ListBuffer()
   val topic1 = "foo"
   val topic2 = "bar"
   val t1p0 = new TopicPartition(topic1, 0)
@@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
-    brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     // Given two topics with replication of a single partition
     for (topic <- List(topic1, topic2)) {
@@ -94,7 +95,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = {
 
     //3 brokers, put partition on 100/101 and then pretend to be 102
-    brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
     TestUtils.createTopic(zkClient, topic1, assignment1, brokers)
@@ -138,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = {
-
     //Setup: we are only interested in the single partition on broker 101
-    brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    brokers += createServer(fromProps(createBrokerConfig(100, zkConnect)))
+    assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
+
+    brokers += createServer(fromProps(createBrokerConfig(101, zkConnect)))
+
     def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
     TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers)
     producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
@@ -150,10 +154,10 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
     //Then epoch should be 0 and leo: 1
-    var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
-    assertEquals(1, offset)
-    assertEquals(leo(), offset)
-
+    var epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
+    assertEquals(0, epochEndOffset.leaderEpoch)
+    assertEquals(1, epochEndOffset.endOffset)
+    assertEquals(1, leo())
 
     //2. When broker is bounced
     brokers(1).shutdown()
@@ -162,15 +166,23 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
     fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
-
     //Then epoch 0 should still be the start offset of epoch 1
-    offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
-    assertEquals(1, offset)
-
-    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica)
-    assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
-    assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
-
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp)
+    assertEquals(1, epochEndOffset.endOffset)
+    assertEquals(0, epochEndOffset.leaderEpoch)
+
+    //No data written in epoch 1
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 1))(tp)
+    assertEquals(0, epochEndOffset.leaderEpoch)
+    assertEquals(1, epochEndOffset.endOffset)
+
+    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 -
+    //This is because we have to first change leader to -1 and then change it again to the live replica)
+    //Note that the expected leader changes depend on the controller being on broker 100, which is not restarted
+    epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 2))(tp)
+    assertEquals(2, epochEndOffset.leaderEpoch)
+    assertEquals(2, epochEndOffset.endOffset)
+    assertEquals(2, leo())
 
     //3. When broker is bounced again
     brokers(1).shutdown()
@@ -179,7 +191,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
     fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
 
-
     //Then Epoch 0 should still map to offset 1
     assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset())
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 4fdc4d2..86a087b 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest {
 
     //Stubs
     val mockLog = createNiceMock(classOf[kafka.log.Log])
-    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
+    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
     val logManager = createNiceMock(classOf[kafka.log.LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4dc822b..df9902f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -373,10 +373,11 @@ object TestUtils extends Logging {
               producerId: Long = RecordBatch.NO_PRODUCER_ID,
               producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
               sequence: Int = RecordBatch.NO_SEQUENCE,
-              baseOffset: Long = 0L): MemoryRecords = {
+              baseOffset: Long = 0L,
+              partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset,
-      System.currentTimeMillis, producerId, producerEpoch, sequence)
+      System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch)
     records.foreach(builder.append)
     builder.build()
   }


Mime
View raw message