kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 01/02: KAFKA-9004; Prevent older clients from fetching from a follower (#7531)
Date Thu, 17 Oct 2019 17:10:31 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0fbf012abda9e9b446c84dd66f605c7f5a0b8a8d
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Thu Oct 17 12:44:14 2019 -0400

    KAFKA-9004; Prevent older clients from fetching from a follower (#7531)
    
    With KIP-392, we allow consumers to fetch from followers. This capability is enabled when
a replica selector has been provided in the configuration. When not in use, the intent is
to preserve current behavior of fetching only from leader. The leader epoch is the mechanism
that keeps us honest. When there is a leader change, the epoch gets bumped, consumer fetches
fail due to the fenced epoch, and we find the new leader.
    
    However, for old consumers, there is no similar protection. The leader epoch was not available
to clients until recently. If there is a preferred leader election (for example), the old
consumer will happily continue fetching from the demoted leader until a periodic metadata
fetch causes us to discover the new leader. This does not create any problems from a correctness
perspective–fetches are still bound by the high watermark–but it is unexpected and may
cause unexpected performance c [...]
    
    This patch fixes this problem by enforcing leader-only fetching for older versions of
the fetch request.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 12 +--
 .../unit/kafka/server/ReplicaManagerTest.scala     | 94 +++++++++++++++++++++-
 2 files changed, 98 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d10c936..ff39339 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -848,18 +848,20 @@ class ReplicaManager(val config: KafkaConfig,
                     isolationLevel: IsolationLevel,
                     clientMetadata: Option[ClientMetadata]): Unit = {
     val isFromFollower = Request.isValidBrokerId(replicaId)
-
-    val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
+    val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
+    val fetchIsolation = if (!isFromConsumer)
       FetchLogEnd
     else if (isolationLevel == IsolationLevel.READ_COMMITTED)
       FetchTxnCommitted
     else
       FetchHighWatermark
 
+    // Restrict fetching to leader if request is from follower or from a client with older
version (no ClientMetadata)
+    val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
     def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
       val result = readFromLocalLog(
         replicaId = replicaId,
-        fetchOnlyFromLeader = isFromFollower,
+        fetchOnlyFromLeader = fetchOnlyFromLeader,
         fetchIsolation = fetchIsolation,
         fetchMaxBytes = fetchMaxBytes,
         hardMaxBytesLimit = hardMaxBytesLimit,
@@ -917,7 +919,7 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata,
partitionData))
         })
       }
-      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
isFromFollower,
+      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader,
         fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
         maybeUpdateHwAndSendResponse)
@@ -971,7 +973,7 @@ class ReplicaManager(val config: KafkaConfig,
               s"${preferredReadReplica.get} for $clientMetadata")
           }
           // If a preferred read-replica is set, skip the read
-          val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch,
false)
+          val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch,
fetchOnlyFromLeader = false)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             highWatermark = offsetSnapshot.highWatermark.messageOffset,
             leaderLogStartOffset = offsetSnapshot.logStartOffset,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 484ee2d..ef05fa1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,10 +24,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Optional, Properties}
 
 import kafka.api.Request
-import kafka.cluster.BrokerEndPoint
 import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
-import kafka.utils.{MockScheduler, MockTime, TestUtils}
-import TestUtils.createBroker
+import kafka.utils.TestUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.LazyOffsetCheckpoints
@@ -921,6 +919,96 @@ class ReplicaManagerTest {
     assertFalse(replicaManager.replicaSelectorOpt.isDefined)
   }
 
+  @Test
+  def testOlderClientFetchFromLeaderOnly(): Unit = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds
= Seq(0, 1))
+
+    val tp0 = new TopicPartition(topic, 0)
+    val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica
= false, offsetCheckpoints)
+    val partition0Replicas = Seq[Integer](0, 1).asJava
+    val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+      Seq(
+        new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(1)
+          .setLeaderEpoch(0)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+
+    def doFetch(replicaId: Int, partitionData: FetchRequest.PartitionData, clientMetadataOpt:
Option[ClientMetadata]):
+        Option[FetchPartitionData] = {
+      var fetchResult: Option[FetchPartitionData] = None
+      def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
+        fetchResult = response.headOption.filter(_._1 == tp0).map(_._2)
+      }
+      replicaManager.fetchMessages(
+        timeout = 0L,
+        replicaId = replicaId,
+        fetchMinBytes = 1,
+        fetchMaxBytes = 100,
+        hardMaxBytesLimit = false,
+        fetchInfos = Seq(tp0 -> partitionData),
+        quota = UnboundedQuota,
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+        responseCallback = callback,
+        clientMetadata = clientMetadataOpt
+      )
+      fetchResult
+    }
+
+    // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
+    val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS,
"")
+    var partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.of(0))
+    var fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata))
+    assertTrue(fetchResult.isDefined)
+    assertEquals(fetchResult.get.error, Errors.NONE)
+
+    // Fetch from follower, with empty ClientMetadata
+    fetchResult = None
+    partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.of(0))
+    fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None)
+    assertTrue(fetchResult.isDefined)
+    assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION)
+
+    // Change to a leader, both cases are allowed
+    val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
0, 0, brokerEpoch,
+      Seq(
+        new LeaderAndIsrPartitionState()
+          .setTopicName(tp0.topic)
+          .setPartitionIndex(tp0.partition)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(partition0Replicas)
+          .setZkVersion(0)
+          .setReplicas(partition0Replicas)
+          .setIsNew(true)).asJava,
+      Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+    replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+
+    partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.of(1))
+    fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, Some(clientMetadata))
+    assertTrue(fetchResult.isDefined)
+    assertEquals(fetchResult.get.error, Errors.NONE)
+
+    fetchResult = None
+    partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
+      Optional.empty())
+    fetchResult = doFetch(Request.OrdinaryConsumerId, partitionData, None)
+    assertTrue(fetchResult.isDefined)
+    assertEquals(fetchResult.get.error, Errors.NONE)
+  }
+
   /**
    * This method assumes that the test using created ReplicaManager calls
    * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing


Mime
View raw message