kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Only invoke hw update logic for follower fetches (#7064)
Date Wed, 10 Jul 2019 15:58:00 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df6efda  MINOR: Only invoke hw update logic for follower fetches (#7064)
df6efda is described below

commit df6efda1f2618c922821b14784857ab69c76c8d3
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Jul 10 08:57:34 2019 -0700

    MINOR: Only invoke hw update logic for follower fetches (#7064)
    
    We noticed a lot of messages like the following in the broker logs recently:
    ```
    [2019-07-10 02:01:23,946] WARN [ReplicaManager broker=0] While updating the HW for follower
-1 for partition connect-storage-topic-connect-cluster-0, the replica could not be found.
(kafka.server.ReplicaManager:70)
    ```
    In the KIP-392 PR, we added logic to track the high watermark of followers, but it is
invoked even for consumer fetches. This doesn't cause any harm other than all the log noise.
    
    This patch just adds the missing follower check.
    
    Reviewers: David Arthur <mumrah@gmail.com>
---
 core/src/main/scala/kafka/server/ReplicaManager.scala | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index bfb50b8..c71e29c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -874,25 +874,26 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     // Wrap the given callback function with another function that will update the HW for
the remote follower
-    val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit =
-      (fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => {
+    def maybeUpdateHwAndSendResponse(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]):
Unit = {
+      if (isFromFollower) {
         fetchPartitionData.foreach {
           case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
         }
-        responseCallback(fetchPartitionData)
       }
+      responseCallback(fetchPartitionData)
+    }
 
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
-    //                        5) all the requested partitions need HW update
+    //                        5) any of the requested partitions need HW update
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData
|| anyPartitionsNeedHwUpdate) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset,
result.info.records,
           result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
       }
-      updateHwAndThenCallback(fetchPartitionData)
+      maybeUpdateHwAndSendResponse(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
       val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@@ -905,7 +906,7 @@ class ReplicaManager(val config: KafkaConfig,
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
isFromFollower,
         fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
-        updateHwAndThenCallback)
+        maybeUpdateHwAndSendResponse)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch
operation
       val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp)
}


Mime
View raw message