kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is not defined (#8607)
Date Tue, 05 May 2020 04:39:37 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fbfda2c  KAFKA-9731: Disable immediate fetch response for hw propagation if replica
selector is not defined (#8607)
fbfda2c is described below

commit fbfda2c4ad889c731aa52b5214e0521f187f8db6
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Mon May 4 21:38:53 2020 -0700

    KAFKA-9731: Disable immediate fetch response for hw propagation if replica selector is
not defined (#8607)
    
    In the case described in the JIRA, there was a 50%+ increase in the total fetch request
rate in
    2.4.0 due to this change.
    
    I included a few additional clean-ups:
    * Simplify `findPreferredReadReplica` and avoid unnecessary collection copies.
    * Use `LongSupplier` instead of `Supplier<Long>` in `SubscriptionState` to avoid
unnecessary boxing.
    
    Added a unit test to ReplicaManagerTest and cleaned up the test class a bit including
    consistent usage of Time in MockTimer and other components.
    
    Reviewers: Gwen Shapira <gwen@confluent.io>, David Arthur <mumrah@gmail.com>,
Jason Gustafson <jason@confluent.io>
---
 .../consumer/internals/SubscriptionState.java      |   8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  57 ++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 135 +++++++++++++++------
 .../scala/unit/kafka/utils/timer/MockTimer.scala   |   3 +-
 gradle/spotbugs-exclude.xml                        |   7 ++
 5 files changed, 137 insertions(+), 73 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6568c91..5b375da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -39,8 +39,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.LongSupplier;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -516,7 +516,7 @@ public class SubscriptionState {
      * @param preferredReadReplicaId The preferred read replica
      * @param timeMs The time at which this preferred replica is no longer valid
      */
-    public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId,
Supplier<Long> timeMs) {
+    public synchronized void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId,
LongSupplier timeMs) {
         assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, timeMs);
     }
 
@@ -721,10 +721,10 @@ public class SubscriptionState {
             }
         }
 
-        private void updatePreferredReadReplica(int preferredReadReplica, Supplier<Long>
timeMs) {
+        private void updatePreferredReadReplica(int preferredReadReplica, LongSupplier timeMs)
{
             if (this.preferredReadReplica == null || preferredReadReplica != this.preferredReadReplica)
{
                 this.preferredReadReplica = preferredReadReplica;
-                this.preferredReadReplicaExpireTimeMs = timeMs.get();
+                this.preferredReadReplicaExpireTimeMs = timeMs.getAsLong();
             }
         }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5bdc3d2..b387785 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1053,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig,
           metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset,
fetchTimeMs))
 
         if (preferredReadReplica.isDefined) {
-          replicaSelectorOpt.foreach{ selector =>
+          replicaSelectorOpt.foreach { selector =>
             debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred
replica " +
               s"${preferredReadReplica.get} for $clientMetadata")
           }
@@ -1079,9 +1079,9 @@ class ReplicaManager(val config: KafkaConfig,
             fetchOnlyFromLeader = fetchOnlyFromLeader,
             minOneMessage = minOneMessage)
 
-          // Check if the HW known to the follower is behind the actual HW
-          val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
-            .exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
+          // Check if the HW known to the follower is behind the actual HW if a replica selector
is defined
+          val followerNeedsHwUpdate = replicaSelectorOpt.isDefined &&
+            partition.getReplica(replicaId).exists(replica => replica.lastSentHighWatermark
< readInfo.highWatermark)
 
           val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
             // If the partition is being throttled, simply return an empty set.
@@ -1170,44 +1170,35 @@ class ReplicaManager(val config: KafkaConfig,
                                replicaId: Int,
                                fetchOffset: Long,
                                currentTimeMs: Long): Option[Int] = {
-    if (partition.isLeader) {
-      if (Request.isValidBrokerId(replicaId)) {
-        // Don't look up preferred for follower fetches via normal replication
-        Option.empty
-      } else {
+    partition.leaderReplicaIdOpt.flatMap { leaderReplicaId =>
+      // Don't look up preferred for follower fetches via normal replication
+      if (Request.isValidBrokerId(replicaId))
+        None
+      else {
         replicaSelectorOpt.flatMap { replicaSelector =>
-          val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
new ListenerName(clientMetadata.listenerName))
-          var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
+          val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
+            new ListenerName(clientMetadata.listenerName))
+          val replicaInfos = partition.remoteReplicas
             // Exclude replicas that don't have the requested offset (whether or not if they're
in the ISR)
-            .filter(replica => replica.logEndOffset >= fetchOffset)
-            .filter(replica => replica.logStartOffset <= fetchOffset)
+            .filter(replica => replica.logEndOffset >= fetchOffset && replica.logStartOffset
<= fetchOffset)
             .map(replica => new DefaultReplicaView(
               replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
               replica.logEndOffset,
               currentTimeMs - replica.lastCaughtUpTimeMs))
-            .toSet
-
-          if (partition.leaderReplicaIdOpt.isDefined) {
-            val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
-              .map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode()))
-              .map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset,
0L))
-              .get
-            replicaInfoSet ++= Set(leaderReplica)
-
-            val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
-            replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala
-              .filter(!_.endpoint.isEmpty)
-              // Even though the replica selector can return the leader, we don't want to
send it out with the
-              // FetchResponse, so we exclude it here
-              .filter(!_.equals(leaderReplica))
-              .map(_.endpoint.id)
-          } else {
-            None
+
+          val leaderReplica = new DefaultReplicaView(
+            replicaEndpoints.getOrElse(leaderReplicaId, Node.noNode()),
+            partition.localLogOrException.logEndOffset, 0L)
+          val replicaInfoSet = mutable.Set[ReplicaView]() ++= replicaInfos += leaderReplica
+
+          val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
+          replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect
{
+            // Even though the replica selector can return the leader, we don't want to send
it out with the
+            // FetchResponse, so we exclude it here
+            case selected if !selected.endpoint.isEmpty && selected != leaderReplica
=> selected.endpoint.id
           }
         }
       }
-    } else {
-      None
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0ed04d5..09e4f14 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -221,7 +221,7 @@ class ReplicaManagerTest {
   }
 
   private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val topicPartition = new TopicPartition(topic, 0)
@@ -277,7 +277,7 @@ class ReplicaManagerTest {
 
   @Test
   def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = {
-    val timer = new MockTimer
+    val timer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
@@ -337,7 +337,7 @@ class ReplicaManagerTest {
 
   @Test
   def testReadCommittedFetchLimitedAtLSO(): Unit = {
-    val timer = new MockTimer
+    val timer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
@@ -444,7 +444,7 @@ class ReplicaManagerTest {
 
   @Test
   def testDelayedFetchIncludesAbortedTransactions(): Unit = {
-    val timer = new MockTimer
+    val timer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
@@ -521,7 +521,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchBeyondHighWatermark(): Unit = {
-    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0,
1, 2))
+    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds
= Seq(0, 1, 2))
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
@@ -579,7 +579,7 @@ class ReplicaManagerTest {
     val maxFetchBytes = 1024 * 1024
     val aliveBrokersIds = Seq(0, 1)
     val leaderEpoch = 5
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokersIds)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokersIds)
     try {
       val tp = new TopicPartition(topic, 0)
       val replicas = aliveBrokersIds.toList.map(Int.box).asJava
@@ -677,7 +677,7 @@ class ReplicaManagerTest {
    */
   @Test
   def testFetchMessagesWhenNotFollowerForOnePartition(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds
= Seq(0, 1, 2))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds
= Seq(0, 1, 2))
 
     try {
       // Create 2 partitions, assign replica 0 as the leader for both a different follower
(1 and 2) for each
@@ -791,8 +791,9 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
-      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId,
countDownLatch, expectTruncation = true)
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId,
countDownLatch,
+      expectTruncation = true, localLogOffset = Some(10))
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val tp = new TopicPartition(topic, topicPartition)
@@ -830,7 +831,7 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
 
@@ -863,7 +864,7 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
 
@@ -912,7 +913,7 @@ class ReplicaManagerTest {
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
 
@@ -951,6 +952,70 @@ class ReplicaManagerTest {
     assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
   }
 
+  @Test
+  def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = {
+    val topicPartition = 0
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+    val timer = new MockTimer(time)
+
+    // Prepare the mocked components for the test
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(timer,
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true)
+
+    val brokerList = Seq[Integer](0, 1).asJava
+
+    val tp0 = new TopicPartition(topic, 0)
+
+    replicaManager.createPartition(new TopicPartition(topic, 0))
+
+    // Make this replica the follower
+    val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(0)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(1)
+        .setIsr(brokerList)
+        .setZkVersion(0)
+        .setReplicas(brokerList)
+        .setIsNew(false)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+
+    val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))
+    val appendResult = appendRecords(replicaManager, tp0,
+      MemoryRecords.withRecords(CompressionType.NONE, simpleRecords.toSeq: _*), AppendOrigin.Client)
+
+    // Increment the hw in the leader by fetching from the last offset
+    val fetchOffset = simpleRecords.size
+    var followerResult = fetchAsFollower(replicaManager, tp0,
+      new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
+      clientMetadata = None)
+    assertTrue(followerResult.isFired)
+    assertEquals(0, followerResult.assertFired.highWatermark)
+
+    assertTrue("Expected producer request to be acked", appendResult.isFired)
+
+    // Fetch from the same offset, no new data is expected and hence the fetch request should
+    // go to the purgatory
+    followerResult = fetchAsFollower(replicaManager, tp0,
+      new PartitionData(fetchOffset, 0, 100000, Optional.empty()),
+      clientMetadata = None, minBytes = 1000)
+    assertFalse("Request completed immediately unexpectedly", followerResult.isFired)
+
+    // Complete the request in the purgatory by advancing the clock
+    timer.advanceClock(1001)
+    assertTrue(followerResult.isFired)
+
+    assertEquals(fetchOffset, followerResult.assertFired.highWatermark)
+  }
+
   @Test(expected = classOf[ClassNotFoundException])
   def testUnknownReplicaSelector(): Unit = {
     val topicPartition = 0
@@ -962,7 +1027,7 @@ class ReplicaManagerTest {
 
     val props = new Properties()
     props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
-    prepareReplicaManagerAndLogManager(
+    prepareReplicaManagerAndLogManager(new MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
   }
@@ -976,7 +1041,7 @@ class ReplicaManagerTest {
     val leaderEpochIncrement = 2
     val countDownLatch = new CountDownLatch(1)
 
-    val (replicaManager, _) = prepareReplicaManagerAndLogManager(
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time),
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true)
     assertFalse(replicaManager.replicaSelectorOpt.isDefined)
@@ -984,7 +1049,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchFollowerNotAllowedForOlderClients(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds
= Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@@ -1022,7 +1087,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchRequestRateMetrics(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1067,7 +1132,7 @@ class ReplicaManagerTest {
 
   @Test
   def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1115,7 +1180,7 @@ class ReplicaManagerTest {
 
   @Test
   def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1164,7 +1229,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchFromLeaderAlwaysAllowed(): Unit = {
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds
= Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
@@ -1205,7 +1270,7 @@ class ReplicaManagerTest {
     // In this case, we should ensure that pending purgatory operations are cancelled
     // immediately rather than sitting around to timeout.
 
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1244,7 +1309,7 @@ class ReplicaManagerTest {
 
   @Test
   def testClearProducePurgatoryOnStopReplica(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1329,22 +1394,22 @@ class ReplicaManagerTest {
    * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
    * 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'.
    */
-  private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+  private def prepareReplicaManagerAndLogManager(timer: MockTimer,
+                                                 topicPartition: Int,
                                                  leaderEpochInLeaderAndIsr: Int,
                                                  followerBrokerId: Int,
                                                  leaderBrokerId: Int,
                                                  countDownLatch: CountDownLatch,
                                                  expectTruncation: Boolean,
+                                                 localLogOffset: Option[Long] = None,
+                                                 offsetFromLeader: Long = 5,
+                                                 leaderEpochFromLeader: Int = 3,
                                                  extraProps: Properties = new Properties())
: (ReplicaManager, LogManager) = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     props.asScala ++= extraProps.asScala
     val config = KafkaConfig.fromProps(props)
 
-    // Setup mock local log to have leader epoch of 3 and offset of 10
-    val localLogOffset = 10
-    val offsetFromLeader = 5
-    val leaderEpochFromLeader = 3
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@@ -1365,14 +1430,17 @@ class ReplicaManagerTest {
 
       override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
         assertEquals(leaderEpoch, leaderEpochFromLeader)
-        Some(OffsetAndEpoch(localLogOffset, leaderEpochFromLeader))
+        localLogOffset.map { logOffset =>
+          Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader))
+        }.getOrElse(super.endOffsetForEpoch(leaderEpoch))
       }
 
       override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader)
 
-      override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
+      override def logEndOffsetMetadata: LogOffsetMetadata =
+        localLogOffset.map(LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata)
 
-      override def logEndOffset: Long = localLogOffset
+      override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset)
     }
 
     // Expect to call LogManager.truncateTo exactly once
@@ -1414,7 +1482,6 @@ class ReplicaManagerTest {
       .anyTimes()
     EasyMock.replay(metadataCache)
 
-    val timer = new MockTimer
     val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
       purgatoryName = "Produce", timer, reaperEnabled = false)
     val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@@ -1822,7 +1889,7 @@ class ReplicaManagerTest {
 
   @Test
   def testStopReplicaWithStaleControllerEpoch(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1848,7 +1915,7 @@ class ReplicaManagerTest {
 
   @Test
   def testStopReplicaWithOfflinePartition(): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1890,7 +1957,7 @@ class ReplicaManagerTest {
   }
 
   private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean, throwIOException:
Boolean): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
@@ -1983,7 +2050,7 @@ class ReplicaManagerTest {
                                                    deletePartition: Boolean,
                                                    throwIOException: Boolean,
                                                    expectedOutput: Errors): Unit = {
-    val mockTimer = new MockTimer
+    val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
index 8805b11..819954a 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
@@ -20,9 +20,8 @@ import kafka.utils.MockTime
 
 import scala.collection.mutable
 
-class MockTimer extends Timer {
+class MockTimer(val time: MockTime = new MockTime) extends Timer {
 
-  val time = new MockTime
   private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse)
 
   def add(timerTask: TimerTask): Unit = {
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 86cc464..6e9a6c1 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -156,6 +156,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
 
     <Match>
         <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12,
fixed in 2.13 -->
+        <Source name="ReplicaManager.scala"/>
+        <Package name="kafka.server"/>
+        <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
+    </Match>
+
+    <Match>
+        <!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12,
fixed in 2.13 -->
         <Source name="LogManager.scala"/>
         <Package name="kafka.log"/>
         <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>


Mime
View raw message