kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 3.0 updated: KAFKA-13161; Update replica partition state and replica fetcher state on follower update (#11189)
Date Tue, 10 Aug 2021 19:34:48 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new fb241c1  KAFKA-13161; Update replica partition state and replica fetcher state on
follower update (#11189)
fb241c1 is described below

commit fb241c1aef0339427701e5b1e65fd134279b6203
Author: José Armando García Sancio <jsancio@users.noreply.github.com>
AuthorDate: Tue Aug 10 12:06:30 2021 -0700

    KAFKA-13161; Update replica partition state and replica fetcher state on follower update
(#11189)
    
    When processing the topics delta, make sure that the replica manager partition state and
replica fetcher state matches the information included in the topic delta. Also ensure that
delayed operations are processed after the follower state change has been made since that
is what allows them to be completed.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../main/scala/kafka/server/DelayedProduce.scala   |  13 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  53 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 741 +++++++++++++++------
 .../org/apache/kafka/image/ClusterImageTest.java   |   2 +-
 4 files changed, 570 insertions(+), 239 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 964da37..5e7e7bf 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -74,11 +74,12 @@ class DelayedProduce(delayMs: Long,
    * The delayed produce operation can be completed if every partition
    * it produces to is satisfied by one of the following:
    *
-   * Case A: This broker is no longer the leader: set an error in response
-   * Case B: This broker is the leader:
-   *   B.1 - If there was a local error thrown while checking if at least requiredAcks
+   * Case A: Replica not assigned to partition
+   * Case B: Replica is no longer the leader of this partition
+   * Case C: This broker is the leader:
+   *   C.1 - If there was a local error thrown while checking if at least requiredAcks
    *         replicas have caught up to this operation: set an error in response
-   *   B.2 - Otherwise, set the response with no error.
+   *   C.2 - Otherwise, set the response with no error.
    */
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
@@ -95,7 +96,7 @@ class DelayedProduce(delayMs: Long,
             partition.checkEnoughReplicasReachOffset(status.requiredOffset)
         }
 
-        // Case B.1 || B.2
+        // Case B || C.1 || C.2
         if (error != Errors.NONE || hasEnough) {
           status.acksPending = false
           status.responseStatus.error = error
@@ -103,7 +104,7 @@ class DelayedProduce(delayMs: Long,
       }
     }
 
-    // check if every partition has satisfied at least one of case A or B
+    // check if every partition has satisfied at least one of case A, B or C
     if (!produceMetadata.produceStatus.values.exists(_.acksPending))
       forceComplete()
     else
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 84ccd8e..2c2db89 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2129,8 +2129,8 @@ class ReplicaManager(val config: KafkaConfig,
           if (localLog(tp).isEmpty)
             markPartitionOffline(tp)
         }
-        newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded(_))
-        newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded(_))
+        newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded)
+        newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded)
 
         replicaFetcherManager.shutdownIdleFetcherThreads()
         replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@@ -2174,7 +2174,6 @@ class ReplicaManager(val config: KafkaConfig,
                                        newLocalFollowers: mutable.HashMap[TopicPartition,
LocalLeaderInfo]): Unit = {
     stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
       "local followers.")
-    replicaFetcherManager.removeFetcherForPartitions(newLocalFollowers.keySet)
     val shuttingDown = isShuttingDown.get()
     val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, InitialFetchState]
     val newFollowerTopicSet = new mutable.HashSet[String]
@@ -2183,13 +2182,6 @@ class ReplicaManager(val config: KafkaConfig,
         try {
           newFollowerTopicSet.add(tp.topic())
 
-          completeDelayedFetchOrProduceRequests(tp)
-
-          // Create the local replica even if the leader is unavailable. This is required
-          // to ensure that we include the partition's high watermark in the checkpoint
-          // file (see KAFKA-1647)
-          partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
-
           if (shuttingDown) {
             stateChangeLogger.trace(s"Unable to start fetching ${tp} with topic " +
               s"ID ${info.topicId} because the replica manager is shutting down.")
@@ -2197,15 +2189,30 @@ class ReplicaManager(val config: KafkaConfig,
             val listenerName = config.interBrokerListenerName.value()
             val leader = info.partition.leader
             Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala)
match {
-              case None => stateChangeLogger.trace(s"Unable to start fetching ${tp} "
+
-                s"with topic ID ${info.topicId} from leader ${leader} because it is not "
+
-                "alive.")
+              case None =>
+                stateChangeLogger.trace(
+                  s"Unable to start fetching ${tp} with topic ID ${info.topicId} from leader
" +
+                  s"${leader} because it is not alive."
+                )
+
+                // Create the local replica even if the leader is unavailable. This is required
+                // to ensure that we include the partition's high watermark in the checkpoint
+                // file (see KAFKA-1647)
+                partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
               case Some(node) =>
-                val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
-                val log = partition.localLogOrException
-                val fetchOffset = initialFetchOffset(log)
-                partitionsToMakeFollower.put(tp,
-                  InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
+                val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+                if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId)))
{
+                  val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
+                  val log = partition.localLogOrException
+                  val fetchOffset = initialFetchOffset(log)
+                  partitionsToMakeFollower.put(tp,
+                    InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
+                } else {
+                  stateChangeLogger.info(
+                    s"Skipped the become-follower state change after marking its partition
as " +
+                    s"follower for partition $tp with id ${info.topicId} and partition state
$state."
+                  )
+                }
             }
           }
           changedPartitions.add(partition)
@@ -2216,8 +2223,16 @@ class ReplicaManager(val config: KafkaConfig,
         }
       }
     }
-    updateLeaderAndFollowerMetrics(newFollowerTopicSet)
+
+    replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
+    stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size}
partitions")
+
     replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
+    stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size}
partitions")
+
+    partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
+
+    updateLeaderAndFollowerMetrics(newFollowerTopicSet)
   }
 
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index cab00c0..68baafa 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -17,6 +17,14 @@
 
 package kafka.server
 
+import java.io.File
+import java.net.InetAddress
+import java.nio.file.Files
+import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.stream.IntStream
+import java.util.{Collections, Optional, Properties}
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.log._
@@ -30,7 +38,9 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
+import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord}
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
@@ -41,25 +51,14 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
+import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage,
FeaturesImage, MetadataImage, TopicImage, TopicsDelta, TopicsImage }
+import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
 import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.mockito.{ArgumentMatchers, Mockito}
-
-import java.io.File
-import java.net.InetAddress
-import java.nio.file.Files
-import java.util
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Collections, Optional, Properties}
-import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord}
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.image.{TopicImage, TopicsDelta, TopicsImage}
-import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-
+import org.mockito.{ArgumentMatchers, Mockito}
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 
@@ -864,31 +863,37 @@ class ReplicaManagerTest {
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId,
countDownLatch,
       expectTruncation = expectTruncation, localLogOffset = Some(10), extraProps = extraProps,
topicId = Some(topicId))
 
-    // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
-    val tp = new TopicPartition(topic, topicPartition)
-    val partition = replicaManager.createPartition(tp)
-    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints,
None)
-    partition.makeFollower(
-      leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
-      offsetCheckpoints,
-      None)
-
-    // Make local partition a follower - because epoch increased by more than 1, truncation
should
-    // trigger even though leader does not change
-    leaderEpoch += leaderEpochIncrement
-    val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
-      controllerId, controllerEpoch, brokerEpoch,
-      Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava,
-      Collections.singletonMap(topic, topicId),
-      Set(new Node(followerBrokerId, "host1", 0),
-        new Node(leaderBrokerId, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
-      (_, followers) => assertEquals(followerBrokerId, followers.head.partitionId))
-    assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
+    try {
+      // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
+      val tp = new TopicPartition(topic, topicPartition)
+      val partition = replicaManager.createPartition(tp)
+      val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints,
None)
+      partition.makeFollower(
+        leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
+        offsetCheckpoints,
+        None)
+
+      // Make local partition a follower - because epoch increased by more than 1, truncation
should
+      // trigger even though leader does not change
+      leaderEpoch += leaderEpochIncrement
+      val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+        controllerId, controllerEpoch, brokerEpoch,
+        Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(followerBrokerId, "host1", 0),
+          new Node(leaderBrokerId, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
+        (_, followers) => assertEquals(followerBrokerId, followers.head.partitionId))
+      assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
+
+      // Truncation should have happened once
+      EasyMock.verify(mockLogMgr)
+    } finally {
+      replicaManager.shutdown()
+    }
 
-    // Truncation should have happened once
-    EasyMock.verify(mockLogMgr)
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @Test
@@ -940,40 +945,46 @@ class ReplicaManagerTest {
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId))
 
-    val brokerList = Seq[Integer](0, 1).asJava
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
 
-    val tp0 = new TopicPartition(topic, 0)
+      val tp0 = new TopicPartition(topic, 0)
 
-    initializeLogAndTopicId(replicaManager, tp0, topicId)
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
 
-    // Make this replica the follower
-    val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(1)
-        .setLeaderEpoch(1)
-        .setIsr(brokerList)
-        .setZkVersion(0)
-        .setReplicas(brokerList)
-        .setIsNew(false)).asJava,
-      Collections.singletonMap(topic, topicId),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(1)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
-    val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
-      InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
 
-    val consumerResult = fetchAsConsumer(replicaManager, tp0,
-      new PartitionData(0, 0, 100000, Optional.empty()),
-      clientMetadata = Some(metadata))
+      val consumerResult = fetchAsConsumer(replicaManager, tp0,
+        new PartitionData(0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata))
 
-    // Fetch from follower succeeds
-    assertTrue(consumerResult.isFired)
+      // Fetch from follower succeeds
+      assertTrue(consumerResult.isFired)
 
-    // But only leader will compute preferred replica
-    assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
+      // But only leader will compute preferred replica
+      assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @Test
@@ -991,40 +1002,46 @@ class ReplicaManagerTest {
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId))
 
-    val brokerList = Seq[Integer](0, 1).asJava
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
 
-    val tp0 = new TopicPartition(topic, 0)
+      val tp0 = new TopicPartition(topic, 0)
 
-    initializeLogAndTopicId(replicaManager, tp0, topicId)
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
 
-    // Make this replica the follower
-    val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(1)
-        .setIsr(brokerList)
-        .setZkVersion(0)
-        .setReplicas(brokerList)
-        .setIsNew(false)).asJava,
-      Collections.singletonMap(topic, topicId),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
-    val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
-      InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
+
+      val consumerResult = fetchAsConsumer(replicaManager, tp0,
+        new PartitionData(0, 0, 100000, Optional.empty()),
+        clientMetadata = Some(metadata))
 
-    val consumerResult = fetchAsConsumer(replicaManager, tp0,
-      new PartitionData(0, 0, 100000, Optional.empty()),
-      clientMetadata = Some(metadata))
+      // Fetch from follower succeeds
+      assertTrue(consumerResult.isFired)
 
-    // Fetch from follower succeeds
-    assertTrue(consumerResult.isFired)
+      // Returns a preferred replica (should just be the leader, which is None)
+      assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
+    } finally {
+      replicaManager.shutdown()
+    }
 
-    // Returns a preferred replica (should just be the leader, which is None)
-    assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @Test
@@ -1136,40 +1153,45 @@ class ReplicaManagerTest {
   @Test
   def testFetchFollowerNotAllowedForOlderClients(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds
= Seq(0, 1))
+    try {
+      val tp0 = new TopicPartition(topic, 0)
+      val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica
= false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(1)
+          .setLeaderEpoch(0)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+        Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
+
+      // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
+      val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS,
"")
+      var partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+        Optional.of(0))
+      var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
+      assertNotNull(fetchResult.get)
+      assertEquals(Errors.NONE, fetchResult.get.error)
+
+      // Fetch from follower, with empty ClientMetadata (which implies an older version)
+      partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+        Optional.of(0))
+      fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
+      assertNotNull(fetchResult.get)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
+    } finally {
+      replicaManager.shutdown()
+    }
 
-    val tp0 = new TopicPartition(topic, 0)
-    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica
= false, offsetCheckpoints, None)
-    val partition0Replicas = Seq[Integer](0, 1).asJava
-    val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(tp0.topic)
-        .setPartitionIndex(tp0.partition)
-        .setControllerEpoch(0)
-        .setLeader(1)
-        .setLeaderEpoch(0)
-        .setIsr(partition0Replicas)
-        .setZkVersion(0)
-        .setReplicas(partition0Replicas)
-        .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
-
-    // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
-    val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS,
"")
-    var partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
-      Optional.of(0))
-    var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
-    assertNotNull(fetchResult.get)
-    assertEquals(Errors.NONE, fetchResult.get.error)
-
-    // Fetch from follower, with empty ClientMetadata (which implies an older version)
-    partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
-      Optional.of(0))
-    fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
-    assertNotNull(fetchResult.get)
-    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @Test
@@ -1223,50 +1245,56 @@ class ReplicaManagerTest {
     val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
-    val tp0 = new TopicPartition(topic, 0)
-    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica
= false, offsetCheckpoints, None)
-    val partition0Replicas = Seq[Integer](0, 1).asJava
-    val topicId = Uuid.randomUuid()
+    try {
+      val tp0 = new TopicPartition(topic, 0)
+      val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica
= false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicId = Uuid.randomUuid()
 
-    val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(tp0.topic)
-        .setPartitionIndex(tp0.partition)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(1)
-        .setIsr(partition0Replicas)
-        .setZkVersion(0)
-        .setReplicas(partition0Replicas)
-        .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, topicId),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+        Collections.singletonMap(tp0.topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
-    val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
-      Optional.empty())
-    val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout
= 10)
-    assertNull(fetchResult.get)
+      val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+        Optional.empty())
+      val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout
= 10)
+      assertNull(fetchResult.get)
 
-    // Become a follower and ensure that the delayed fetch returns immediately
-    val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(tp0.topic)
-        .setPartitionIndex(tp0.partition)
-        .setControllerEpoch(0)
-        .setLeader(1)
-        .setLeaderEpoch(2)
-        .setIsr(partition0Replicas)
-        .setZkVersion(0)
-        .setReplicas(partition0Replicas)
-        .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, topicId),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
+      // Become a follower and ensure that the delayed fetch returns immediately
+      val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(1)
+          .setLeaderEpoch(2)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+        Collections.singletonMap(tp0.topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
 
-    assertNotNull(fetchResult.get)
-    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
+      assertNotNull(fetchResult.get)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @Test
@@ -1274,51 +1302,57 @@ class ReplicaManagerTest {
     val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds
= Seq(0, 1))
 
-    val tp0 = new TopicPartition(topic, 0)
-    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica
= false, offsetCheckpoints, None)
-    val partition0Replicas = Seq[Integer](0, 1).asJava
-    val topicId = Uuid.randomUuid()
+    try {
+      val tp0 = new TopicPartition(topic, 0)
+      val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica
= false, offsetCheckpoints, None)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val topicId = Uuid.randomUuid()
 
-    val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(tp0.topic)
-        .setPartitionIndex(tp0.partition)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(1)
-        .setIsr(partition0Replicas)
-        .setZkVersion(0)
-        .setReplicas(partition0Replicas)
-        .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, topicId),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+        Collections.singletonMap(tp0.topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
-    val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS,
"")
-    val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
-      Optional.of(1))
-    val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata),
timeout = 10)
-    assertNull(fetchResult.get)
+      val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS,
"")
+      val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+        Optional.of(1))
+      val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata),
timeout = 10)
+      assertNull(fetchResult.get)
 
-    // Become a follower and ensure that the delayed fetch returns immediately
-    val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
-      Seq(new LeaderAndIsrPartitionState()
-        .setTopicName(tp0.topic)
-        .setPartitionIndex(tp0.partition)
-        .setControllerEpoch(0)
-        .setLeader(1)
-        .setLeaderEpoch(2)
-        .setIsr(partition0Replicas)
-        .setZkVersion(0)
-        .setReplicas(partition0Replicas)
-        .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, topicId),
-      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-    replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
+      // Become a follower and ensure that the delayed fetch returns immediately
+      val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(1)
+          .setLeaderEpoch(2)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+        Collections.singletonMap(tp0.topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
 
-    assertNotNull(fetchResult.get)
-    assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error)
+      assertNotNull(fetchResult.get)
+      assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error)
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
   @Test
@@ -1431,7 +1465,7 @@ class ReplicaManagerTest {
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
-    val produceResult = sendProducerAppend(replicaManager, tp0)
+    val produceResult = sendProducerAppend(replicaManager, tp0, 3)
     assertNull(produceResult.get)
 
     Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
@@ -1446,17 +1480,22 @@ class ReplicaManagerTest {
     assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error)
   }
 
-  private def sendProducerAppend(replicaManager: ReplicaManager,
-                                 topicPartition: TopicPartition): AtomicReference[PartitionResponse]
= {
+  private def sendProducerAppend(
+    replicaManager: ReplicaManager,
+    topicPartition: TopicPartition,
+    numOfRecords: Int
+  ): AtomicReference[PartitionResponse] = {
     val produceResult = new AtomicReference[PartitionResponse]()
     def callback(response: Map[TopicPartition, PartitionResponse]): Unit = {
       produceResult.set(response(topicPartition))
     }
 
-    val records = MemoryRecords.withRecords(CompressionType.NONE,
-      new SimpleRecord("a".getBytes()),
-      new SimpleRecord("b".getBytes()),
-      new SimpleRecord("c".getBytes())
+    val records = MemoryRecords.withRecords(
+      CompressionType.NONE,
+      IntStream
+        .range(0, numOfRecords)
+        .mapToObj(i => new SimpleRecord(i.toString.getBytes))
+        .toArray(Array.ofDim[SimpleRecord]): _*
     )
 
     replicaManager.appendRecords(
@@ -2690,4 +2729,280 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testDeltaFromLeaderToFollower(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val numOfRecords = 3
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the leader
+      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
+      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(epoch, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Send a produce request and advance the highwatermark
+      val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
+      fetchMessages(
+        replicaManager,
+        otherId,
+        topicPartition,
+        new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()),
+        Int.MaxValue,
+        IsolationLevel.READ_UNCOMMITTED,
+        None
+      )
+      assertEquals(Errors.NONE, leaderResponse.get.error)
+
+      // Change the local replica to follower
+      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
+      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch
+ 1))
+
+      // Append on a follower should fail
+      val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(epoch + 1, followerPartition.getLeaderEpoch)
+
+      val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFromFollowerToLeader(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val numOfRecords = 3
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch))
+      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(epoch, followerPartition.getLeaderEpoch)
+
+      val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
+
+      // Append on a follower should fail
+      val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
+
+      // Change the local replica to leader
+      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch + 1))
+      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch + 1))
+
+      // Send a produce request and advance the highwatermark
+      val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
+      fetchMessages(
+        replicaManager,
+        otherId,
+        topicPartition,
+        new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()),
+        Int.MaxValue,
+        IsolationLevel.READ_UNCOMMITTED,
+        None
+      )
+      assertEquals(Errors.NONE, leaderResponse.get.error)
+
+      val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(epoch + 1, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFollowerWithNoChange(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the follower
+      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch))
+      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(epoch, followerPartition.getLeaderEpoch)
+
+      val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
+
+      // Apply the same delta again
+      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
+
+      // Check that the state stays the same
+      val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(noChangePartition.isLeader)
+      assertEquals(epoch, noChangePartition.getLeaderEpoch)
+
+      val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+      assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.sourceBroker))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaToFollowerCompletesProduce(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val numOfRecords = 3
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the leader
+      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
+      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(epoch, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Send a produce request
+      val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
+
+      // Change the local replica to follower
+      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
+      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch
+ 1))
+
+      // Check that the produce failed because it changed to follower before replicating
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaToFollowerCompletesFetch(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
+
+    try {
+      // Make the local replica the leader
+      val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
+      replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
+
+      // Check the state of that partition and fetcher
+      val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
+      assertTrue(leaderPartition.isLeader)
+      assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+      assertEquals(epoch, leaderPartition.getLeaderEpoch)
+
+      assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+      // Send a fetch request
+      val fetchCallback = fetchMessages(
+        replicaManager,
+        otherId,
+        topicPartition,
+        new PartitionData(0, 0, Int.MaxValue, Optional.empty()),
+        Int.MaxValue,
+        IsolationLevel.READ_UNCOMMITTED,
+        None
+      )
+
+      // Change the local replica to follower
+      val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
+      replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch
+ 1))
+
+      // Check that the produce failed because it changed to follower before replicating
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error)
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  private def topicsImage(replica: Int, isLeader: Boolean, epoch: Int): TopicsImage = {
+    val leader = if (isLeader) replica else replica + 1
+    val topicsById = new util.HashMap[Uuid, TopicImage]()
+    val topicsByName = new util.HashMap[String, TopicImage]()
+    val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
+    fooPartitions.put(0, new PartitionRegistration(Array(replica, replica + 1),
+      Array(replica, replica + 1), Replicas.NONE, Replicas.NONE, leader, epoch, epoch))
+    val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
+
+    topicsById.put(FOO_UUID, foo)
+    topicsByName.put("foo", foo)
+
+    new TopicsImage(topicsById, topicsByName)
+  }
+
+  private def topicsDelta(replica: Int, isLeader: Boolean, epoch: Int): TopicsDelta = {
+    val leader = if (isLeader) replica else replica + 1
+    val delta = new TopicsDelta(TopicsImage.EMPTY)
+    delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
+    delta.replay(new PartitionRecord().setPartitionId(0).
+      setTopicId(FOO_UUID).
+      setReplicas(util.Arrays.asList(replica, replica + 1)).
+      setIsr(util.Arrays.asList(replica, replica + 1)).
+      setRemovingReplicas(Collections.emptyList()).
+      setAddingReplicas(Collections.emptyList()).
+      setLeader(leader).
+      setLeaderEpoch(epoch).
+      setPartitionEpoch(epoch))
+
+    delta
+  }
+
+  private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
+    new MetadataImage(
+      FeaturesImage.EMPTY,
+      ClusterImageTest.IMAGE1,
+      topicsImage,
+      ConfigurationsImage.EMPTY,
+      ClientQuotasImage.EMPTY
+    )
+  }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index 703a6e1..6908cf2 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class ClusterImageTest {
-    final static ClusterImage IMAGE1;
+    public final static ClusterImage IMAGE1;
 
     static final List<ApiMessageAndVersion> DELTA1_RECORDS;
 

Mime
View raw message