kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4485; Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader
Date Wed, 21 Dec 2016 05:31:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ecc0fc447 -> fc88bac66


KAFKA-4485; Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset
of leader

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #2208 from lindong28/KAFKA-4485


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc88bac6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc88bac6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc88bac6

Branch: refs/heads/trunk
Commit: fc88bac66686ef963b5aec53a3eba717336e4ba7
Parents: ecc0fc4
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Dec 20 21:31:50 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Dec 20 21:31:50 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/ProducerConfig.java  |  4 +-
 .../main/scala/kafka/cluster/Partition.scala    | 35 +++++++++++---
 core/src/main/scala/kafka/cluster/Replica.scala | 42 +++++++++++++----
 .../scala/kafka/server/ReplicaManager.scala     | 49 +++++++++++++-------
 .../unit/kafka/server/ISRExpirationTest.scala   | 36 ++++++++++----
 .../unit/kafka/server/SimpleFetchTest.scala     |  6 ++-
 6 files changed, 130 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc88bac6/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 4b0e999..39446f5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -210,7 +210,9 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>request.timeout.ms</code> */
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
-    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
+                                                        + " request.timeout.ms should be
larger than replica.lag.time.max.ms, a broker side configuration,"
+                                                        + " to reduce message duplication
caused by unnecessary producer retry.";
 
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc88bac6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d555b73..8a19f17 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -198,6 +198,13 @@ class Partition(val topic: String,
           true
         }
       val leaderReplica = getReplica().get
+      val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
+      val curTimeMs = time.milliseconds
+      // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
+      (assignedReplicas() - leaderReplica).foreach{replica =>
+        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
+        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
+      }
       // we may need to increment high watermark since ISR could be down to 1
       if (isNewLeader) {
         // construct the high watermark metadata for the new leader replica
@@ -251,7 +258,7 @@ class Partition(val topic: String,
         replica.updateLogReadResult(logReadResult)
         // check if we need to expand ISR to include this replica
         // if it is not in the ISR yet
-        maybeExpandIsr(replicaId)
+        maybeExpandIsr(replicaId, logReadResult)
 
         debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
           .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
@@ -268,10 +275,15 @@ class Partition(val topic: String,
 
   /**
    * Check and maybe expand the ISR of the partition.
+   * A replica will be added to ISR if its LEO >= current hw of the partition.
+   *
+   * Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs,
+   * even if its log end offset is >= HW. However, to be consistent with how the follower
determines
+   * whether a replica is in-sync, we only check HW.
    *
    * This function can be triggered when a replica's LEO has incremented
    */
-  def maybeExpandIsr(replicaId: Int) {
+  def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
     val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
       leaderReplicaIfLocal() match {
@@ -280,7 +292,7 @@ class Partition(val topic: String,
           val leaderHW = leaderReplica.highWatermark
           if(!inSyncReplicas.contains(replica) &&
              assignedReplicas.map(_.brokerId).contains(replicaId) &&
-                  replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
+             replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
             val newInSyncReplicas = inSyncReplicas + replica
             info("Expanding ISR for partition [%s,%d] from %s to %s"
                          .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
@@ -291,8 +303,8 @@ class Partition(val topic: String,
           }
 
           // check if the HW of the partition can now be incremented
-          // since the replica maybe now be in the ISR and its LEO has just incremented
-          maybeIncrementLeaderHW(leaderReplica)
+          // since the replica may already be in the ISR and its LEO has just incremented
+          maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
 
         case None => false // nothing to do if no longer leader
       }
@@ -356,12 +368,21 @@ class Partition(val topic: String,
    * 1. Partition ISR changed
    * 2. Any replica's LEO changed
    *
+   * The HW is determined by the smallest log end offset among all replicas that are in sync
or are considered caught-up.
+   * This way, if a replica is considered caught-up, but its log end offset is smaller than
HW, we will wait for this
+   * replica to catch up to the HW before advancing the HW. This helps the situation when
the ISR only includes the
+   * leader replica and a follower tries to catch up. If we don't wait for the follower when
advancing the HW, the
+   * follower's log end offset may keep falling behind the HW (determined by the leader's
log end offset) and therefore
+   * will never be added to ISR.
+   *
    * Returns true if the HW was incremented, and false otherwise.
    * Note There is no need to acquire the leaderIsrUpdate lock here
    * since all callers of this private API acquire that lock
    */
-  private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
-    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
+  private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds):
Boolean = {
+    val allLogEndOffsets = assignedReplicas.filter{replica =>
+      curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs
|| inSyncReplicas.contains(replica)
+    }.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
     if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark))
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc88bac6/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 40cf181..4d90815 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -36,25 +36,51 @@ class Replica(val brokerId: Int,
   // for local replica it is the log's end offset, for remote replicas its value is only
updated by follower fetch
   @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
 
+  // The log end offset value at the time the leader receives last FetchRequest from this
follower.
+  // This is used to determine the lastCaughtUpTimeMs of the follower
+  @volatile private var lastFetchLeaderLogEndOffset: Long = 0L
+
+  // The time when the leader receives last FetchRequest from this follower
+  // This is used to determine the lastCaughtUpTimeMs of the follower
+  @volatile private var lastFetchTimeMs: Long = 0L
+
   val topic = partition.topic
   val partitionId = partition.partitionId
 
   def isLocal: Boolean = log.isDefined
 
-  private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
+  // lastCaughtUpTimeMs is the largest time t such that the begin offset of most recent FetchRequest
from this follower >=
+  // the LEO of leader at time t. This is used to determine the lag of this follower and
ISR of this partition.
+  private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(0L)
 
   def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get()
 
+  /*
+   * If the FetchRequest reads up to the log end offset of the leader when the current fetch
request was received,
+   * set the lastCaughtUpTimeMsUnderlying to the time when the current fetch request was
received.
+   *
+   * Else if the FetchRequest reads up to the log end offset of the leader when the previous
fetch request was received,
+   * set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was
received.
+   *
+   * This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only
if it lags behind leader's LEO
+   * by at most replicaLagTimeMaxMs. This semantics allows a follower to be added to the
ISR even if offset of its fetch request is
+   * always smaller than leader's LEO, which can happen if there are constant small produce
requests at high frequency.
+   */
   def updateLogReadResult(logReadResult : LogReadResult) {
+    if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
+      lastCaughtUpTimeMsUnderlying.set(logReadResult.fetchTimeMs)
+    else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
+      lastCaughtUpTimeMsUnderlying.set(lastFetchTimeMs)
+
     logEndOffset = logReadResult.info.fetchOffsetMetadata
+    lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
+    lastFetchTimeMs = logReadResult.fetchTimeMs
+  }
 
-    /* If the request read up to the log end offset snapshot when the read was initiated,
-     * set the lastCaughtUpTimeMsUnderlying to the current time.
-     * This means that the replica is fully caught up.
-     */
-    if(logReadResult.isReadFromLogEnd) {
-      lastCaughtUpTimeMsUnderlying.set(time.milliseconds)
-    }
+  def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs:
Long) {
+    lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
+    lastFetchTimeMs = curTimeMs
+    lastCaughtUpTimeMsUnderlying.set(lastCaughtUpTimeMs)
   }
 
   private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc88bac6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6dfe97f..859a7c4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -64,8 +64,9 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable]
= None)
  */
 case class LogReadResult(info: FetchDataInfo,
                          hw: Long,
+                         leaderLogEndOffset: Long,
+                         fetchTimeMs: Long,
                          readSize: Int,
-                         isReadFromLogEnd : Boolean,
                          error: Option[Throwable] = None) {
 
   def errorCode = error match {
@@ -74,19 +75,19 @@ case class LogReadResult(info: FetchDataInfo,
   }
 
   override def toString = {
-    "Fetch Data: [%s], HW: [%d], readSize: [%d], isReadFromLogEnd: [%b], error: [%s]"
-            .format(info, hw, readSize, isReadFromLogEnd, error)
+    "Fetch Data: [%s], HW: [%d], leaderLogEndOffset: [%d], readSize: [%d], error: [%s]"
+            .format(info, hw, leaderLogEndOffset, readSize, error)
   }
 }
 
 case class FetchPartitionData(error: Short = Errors.NONE.code, hw: Long = -1L, records: Records)
 
 object LogReadResult {
-  val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata,
-                                                         MemoryRecords.EMPTY),
-                                           -1L,
-                                           -1,
-                                           false)
+  val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata,
MemoryRecords.EMPTY),
+                                           hw = -1L,
+                                           leaderLogEndOffset = -1L,
+                                           fetchTimeMs = -1L,
+                                           readSize = -1)
 }
 
 case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Short],
errorCode: Short) {
@@ -216,7 +217,8 @@ class ReplicaManager(val config: KafkaConfig,
 
   def startup() {
     // start ISR expiration thread
-    scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs,
unit = TimeUnit.MILLISECONDS)
+    // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x (1 + 50%)
before it is removed from ISR
+    scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs
/ 2, unit = TimeUnit.MILLISECONDS)
     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L,
unit = TimeUnit.MILLISECONDS)
   }
 
@@ -539,7 +541,9 @@ class ReplicaManager(val config: KafkaConfig,
          * where data gets appended to the log immediately after the replica has consumed
from it
          * This can cause a replica to always be out of sync.
          */
-        val initialLogEndOffset = localReplica.logEndOffset
+        val initialLogEndOffset = localReplica.logEndOffset.messageOffset
+        val initialHighWatermark = localReplica.highWatermark.messageOffset
+        val fetchTimeMs = time.milliseconds
         val logReadInfo = localReplica.log match {
           case Some(log) =>
             val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)
@@ -561,9 +565,12 @@ class ReplicaManager(val config: KafkaConfig,
             FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
         }
 
-        val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset
<= 0
-
-        LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, partitionFetchSize,
readToEndOfLog, None)
+        LogReadResult(info = logReadInfo,
+                      hw = initialHighWatermark,
+                      leaderLogEndOffset = initialLogEndOffset,
+                      fetchTimeMs = fetchTimeMs,
+                      readSize = partitionFetchSize,
+                      error = None)
       } catch {
         // NOTE: Failed fetch requests metric is not incremented for known exceptions since
it
         // is supposed to indicate un-expected failure of a broker in handling a fetch request
@@ -571,14 +578,22 @@ class ReplicaManager(val config: KafkaConfig,
                  _: NotLeaderForPartitionException |
                  _: ReplicaNotAvailableException |
                  _: OffsetOutOfRangeException) =>
-          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
-1L,
-            partitionFetchSize, false, Some(e))
+          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+                        hw = -1L,
+                        leaderLogEndOffset = -1L,
+                        fetchTimeMs = -1L,
+                        readSize = partitionFetchSize,
+                        error = Some(e))
         case e: Throwable =>
           BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
           error(s"Error processing fetch operation on partition $tp, offset $offset", e)
-          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
-1L,
-            partitionFetchSize, false, Some(e))
+          LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
+                        hw = -1L,
+                        leaderLogEndOffset = -1L,
+                        fetchTimeMs = -1L,
+                        readSize = partitionFetchSize,
+                        error = Some(e))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc88bac6/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 03203ad..0a03cac 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -75,11 +75,12 @@ class IsrExpirationTest {
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
     // let the follower catch up to the Leader logEndOffset (15)
-    (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY),
-                                                   -1L,
-                                                   -1,
-                                                   true)))
+    for(replica <- partition0.assignedReplicas() - leaderReplica)
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY),
+                                                    hw = 15L,
+                                                    leaderLogEndOffset = 15L,
+                                                    fetchTimeMs =time.milliseconds,
+                                                    readSize = -1))
     var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
@@ -127,7 +128,11 @@ class IsrExpirationTest {
 
     // Make the remote replica not read to the end of log. It should be not be out of sync
for at least 100 ms
     for(replica <- partition0.assignedReplicas() - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L),
MemoryRecords.EMPTY), -1L, -1, false))
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L),
MemoryRecords.EMPTY),
+                                                    hw = 10L,
+                                                    leaderLogEndOffset = 15L,
+                                                    fetchTimeMs =time.milliseconds,
+                                                    readSize = -1))
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of
the log.
     // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate
the case where the replica is lagging but is not stuck
@@ -137,7 +142,11 @@ class IsrExpirationTest {
     time.sleep(75)
 
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L),
MemoryRecords.EMPTY), -1L, -1, false)))
+      r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L),
MemoryRecords.EMPTY),
+                                                   hw = 11L,
+                                                   leaderLogEndOffset = 15L,
+                                                   fetchTimeMs =time.milliseconds,
+                                                   readSize = -1)))
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
@@ -149,7 +158,11 @@ class IsrExpirationTest {
 
     // Now actually make a fetch to the end of the log. The replicas should be back in ISR
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY), -1L, -1, true)))
+      r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY),
+                                                   hw = 15L,
+                                                   leaderLogEndOffset = 15L,
+                                                   fetchTimeMs =time.milliseconds,
+                                                   readSize = -1)))
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
@@ -166,6 +179,13 @@ class IsrExpirationTest {
     allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
     // set in sync replicas for this partition to all the assigned replicas
     partition.inSyncReplicas = allReplicas.toSet
+    // set lastCaughtUpTime to current time
+    for(replica <- partition.assignedReplicas() - leaderReplica)
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L),
MemoryRecords.EMPTY),
+                                                    hw = 0L,
+                                                    leaderLogEndOffset = 0L,
+                                                    fetchTimeMs = time.milliseconds,
+                                                    readSize = -1))
     // set the leader and its hw and the hw update time
     partition.leaderReplicaIdOpt = Some(leaderId)
     partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc88bac6/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 676fd3f..1ce17dc 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -111,7 +111,11 @@ class SimpleFetchTest {
     // create the follower replica with defined log end offset
     val followerReplica= new Replica(configs(1).brokerId, partition, time)
     val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY),
-1L, -1, true))
+    followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY),
+                                                          hw = leo.messageOffset,
+                                                          leaderLogEndOffset = leo.messageOffset,
+                                                          fetchTimeMs = time.milliseconds,
+                                                          readSize = -1))
 
     // add both of them to ISR
     val allReplicas = List(leaderReplica, followerReplica)


Mime
View raw message