This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87cc31c KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998)
87cc31c is described below
commit 87cc31c4e7ea36e7e832a1d02d71480a91a75293
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Thu Dec 6 21:46:35 2018 +0800
KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998)
On the follower side, for the empty `LogAppendInfo` retrieved from the leader, fetcherLagStats
set the wrong lag for fetcherLagStats due to `nextOffset` is zero.
---
core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2cee83c..02158fa 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -272,10 +272,10 @@ abstract class AbstractFetcherThread(name: String,
partitionData)
logAppendInfoOpt.foreach { logAppendInfo =>
- val nextOffset = logAppendInfo.lastOffset + 1
+ val validBytes = logAppendInfo.validBytes
+ val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset +
1 else currentFetchState.fetchOffset
fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark
- nextOffset)
- val validBytes = logAppendInfo.validBytes
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates
after processing the partition data
if (validBytes > 0 && partitionStates.contains(topicPartition))
{
// Update partitionStates only if there is no exception during processPartitionData
|