kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) (#9423)
Date Wed, 02 Dec 2020 03:22:42 GMT
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aeeb7b2  KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogDirsThread
is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) (#9423)
aeeb7b2 is described below

commit aeeb7b2f9a9abe8f49543a2278757722e5974cb3
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Wed Dec 2 11:21:28 2020 +0800

    KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing
log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) (#9423)
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala | 62 ++++++++++++-----------
 1 file changed, 32 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f6a9e83..a16a4c1 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -704,7 +704,11 @@ class Partition(val topicPartition: TopicPartition,
         // check if the HW of the partition can now be incremented
         // since the replica may already be in the ISR and its LEO has just incremented
         val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset)
{
-          leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
+          // the leader log may be updated by ReplicaAlterLogDirsThread so the following
method must be in lock of
+          // leaderIsrUpdateLock to prevent adding new hw to invalid log.
+          inReadLock(leaderIsrUpdateLock) {
+            leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
+          }
         } else {
           false
         }
@@ -865,41 +869,39 @@ class Partition(val topicPartition: TopicPartition,
    * committed ISR. However, adding additional replicas to the ISR makes it more restrictive
and therefor safe. We call
    * this set the "maximal" ISR. See KIP-497 for more details
    *
-   * Returns true if the HW was incremented, and false otherwise.
-   * Note There is no need to acquire the leaderIsrUpdate lock here
-   * since all callers of this private API acquire that lock
+   * Note There is no need to acquire the leaderIsrUpdate lock here since all callers of
this private API acquire that lock
+   *
+   * @return true if the HW was incremented, and false otherwise.
    */
   private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds):
Boolean = {
-    inReadLock(leaderIsrUpdateLock) {
-      // maybeIncrementLeaderHW is in the hot path, the following code is written to
-      // avoid unnecessary collection generation
-      var newHighWatermark = leaderLog.logEndOffsetMetadata
-      remoteReplicasMap.values.foreach { replica =>
-        // Note here we are using the "maximal", see explanation above
-        if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset
&&
-          (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || isrState.maximalIsr.contains(replica.brokerId)))
{
-          newHighWatermark = replica.logEndOffsetMetadata
-        }
+    // maybeIncrementLeaderHW is in the hot path, the following code is written to
+    // avoid unnecessary collection generation
+    var newHighWatermark = leaderLog.logEndOffsetMetadata
+    remoteReplicasMap.values.foreach { replica =>
+      // Note here we are using the "maximal", see explanation above
+      if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset
&&
+        (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || isrState.maximalIsr.contains(replica.brokerId)))
{
+        newHighWatermark = replica.logEndOffsetMetadata
       }
+    }
 
-      leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
-        case Some(oldHighWatermark) =>
-          debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
-          true
+    leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
+      case Some(oldHighWatermark) =>
+        debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
+        true
 
-        case None =>
-          def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
-            case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
-          }
+      case None =>
+        def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
+          case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
+        }
 
-          if (isTraceEnabled) {
-            val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet
-            val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
-            trace(s"Skipping update high watermark since new hw $newHighWatermark is not
larger than old value. " +
-              s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
-          }
-          false
-      }
+        if (isTraceEnabled) {
+          val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet
+          val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
+          trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger
than old value. " +
+            s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
+        }
+        false
     }
   }
 


Mime
View raw message